点击下载教程项目代码: netflix_hystrix_demo.zip
netflix_hystrix_demo.zip
请求合并是 Hystrix 提供的一个功能,它可以将多个在短时间内连续到达的同类型请求合并为一个请求进行处理,从而减少资源的消耗和请求的开销。
例如,在一个高并发的系统中,如果有很多客户端频繁地发起对某个服务接口的小请求,每次请求都要建立网络连接、进行数据传输等操作。通过请求合并,可以将这些小请求收集起来,一次性地进行处理,就像把多个小包裹合并成一个大包裹进行运输一样,提高了系统的整体效率。如下图:

高并发的小请求场景:当系统中有大量的小请求(如频繁查询数据库中某张表的同一行数据)时,请求合并可以减少网络开销和数据库连接等资源的占用。
对实时性要求不高的服务接口:如果服务接口对实时性要求不高,稍微的延迟是可以接受的(因为请求合并会有一个请求合并窗口时间,如每 100ms 合并请求,时间窗口为 100ms),那么采用请求合并可以有效地优化系统性能。比如,一个统计系统中,每几分钟更新一次统计数据的接口,就比较适合请求合并。
在高并发场景下,大量的小请求会频繁地建立和断开网络连接。通过请求合并,将多个请求合并为一个,大大减少了网络连接的建立和断开次数,降低了网络带宽的占用,节省了网络资源。
在一些对批量处理有优化的系统中,批量处理请求比逐个处理请求更高效。
减少了请求的数量,使得系统中各个组件(如负载均衡器、缓存服务器等)的负载降低。例如,在一个微服务架构中,负载均衡器需要将每个请求转发到相应的服务实例。请求合并后,转发的请求数量减少,负载均衡器的工作压力降低,系统的整体稳定性和性能得到提升。
由于请求需要等待一段时间来进行合并,对于那些对实时性要求较高的应用场景,这可能会导致不可接受的延迟。
注意:等待合并的时间加上批量处理的时间可能会比单个请求的处理时间更长。
请求合并需要编写额外的代码来实现请求合并逻辑。如果在使用 Netflix Hystrix 时,要定义请求合并器(Collapser)和批量请求命令(Batch Command),这增加了代码的复杂度和维护成本。开发人员需要理解请求合并的原理和机制,才能正确地编写和维护这些代码。
在系统运行过程中,要对请求合并的配置参数(如合并时间间隔、最大合并请求数等)进行优化和调整。当系统的业务逻辑发生变化时,可能需要重新评估和修改请求合并的实现方式,这增加了系统维护的难度。
下面通过根据用户 ID 查询用户信息简单接口来演示请求合并,将 100 毫秒内根据用户 ID 获取用户信息的请求进行合并,批量发送给服务端。
编写服务端代码,服务端提供一个根据用户 ID 列表批量获取用户信息的接口,如下:
package com.hxstrive.service_demo.controller;
import com.hxstrive.service_demo.dto.CommonReturn;
import com.hxstrive.service_demo.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
 * 用户控制器
 * @author hxstrive.com
 */
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
    private final static List<User> USERS = new ArrayList<>();
    static {
        USERS.add(User.builder().id(1L).name("Tom").age(20).build());
        USERS.add(User.builder().id(2L).name("Helen").age(30).build());
        USERS.add(User.builder().id(3L).name("Bill").age(40).build());
    }
    //....
    // 批量获取用户信息
    @PostMapping("/batchGetUserById")
    public List<User> batchGetUserById(@RequestBody List<Long> ids) {
        log.info("batchGetUserById() ids={}", Arrays.toString(ids.toArray()));
        return USERS.stream()
                .filter(user-> ids.contains(user.getId()))
                .collect(Collectors.toList());
    }
    //...
}(1)配置 Feign,开启 hystrix 能力,配置如下:
feign: hystrix: enabled: true
(2)编写 Feign 客户端,并且指定降级方法,如下:
package com.hxstrive.hystrix_demo.feign;
import com.hxstrive.hystrix_demo.dto.CommonReturn;
import com.hxstrive.hystrix_demo.entity.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
/**
 * Feign 客户端
 * @author hxstrive.com
 * @since 1.0.0  2024/11/19 14:24
 */
@Component
@FeignClient(name = "SERVICE-DEMO", fallback = UserServiceFallback.class)
// UserServiceFallback 是降级方法类
public interface UserServiceFeign {
    @PostMapping(value = "/user/batchGetUserById", consumes = MediaType.APPLICATION_JSON_VALUE)
    List<User> batchGetUserById(@RequestBody List<Long> ids);
}(3)编写 Feign 客户端的降级方法,该类实现了 Feign 客户端接口,如下:
package com.hxstrive.hystrix_demo.feign;
import com.hxstrive.hystrix_demo.dto.CommonReturn;
import com.hxstrive.hystrix_demo.entity.User;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
 * Feign 降级
 * @author hxstrive.com
 * @since 1.0.0  2024/11/19 14:25
 */
@Component
public class UserServiceFallback implements UserServiceFeign {
    @Override
    public List<User> batchGetUserById(List<Long> ids) {
        List<User> retList = new ArrayList<>();
        for(Long id : ids) {
            retList.add(null);
        }
        return retList;
    }
}(4)定义请求合并服务类,代码如下:
package com.hxstrive.hystrix_demo.quest_collapser;
import com.hxstrive.hystrix_demo.entity.User;
import com.hxstrive.hystrix_demo.feign.UserServiceFeign;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
/**
 * 请求合并服务
 * @author hxstrive.com
 */
@Service
public class QuestCollapserService {
    @Autowired
    private UserServiceFeign userServiceFeign;
    /**
     * 定义请求合并,设置合并时间窗口等属性
     * 其他服务,调用该方法
     * hystrix 会自动按照配置进行请求合并
     */
    @HystrixCollapser(batchMethod = "batchMethod", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
            collapserProperties = {
                // 进行一次请求合并的窗口时间为 100 毫秒
                @HystrixProperty(name = "timerDelayInMilliseconds", value = "100"),
                // 一次最大合并 20 个请求
                @HystrixProperty(name = "maxRequestsInBatch", value = "20")
    })
    public Future<User> collapserGetUserById(Long id) {
        // 你会发现根本不会进入这个方法体
        System.out.println("collapserGetUserById() 方法执行了..." + id);
        return null;
    }
    /**
     * 批量获取用户信息的方法,对应上面 @HystrixCollapser 注解中指定的batchMethod
     * @param ids 用户ID列表
     * @return 用户信息列表
     */
    @HystrixCommand
    public List<User> batchMethod(List<Long> ids) {
        System.out.println("batchMethod() 方法执行了..." + Arrays.toString(ids.toArray()));
        return userServiceFeign.batchGetUserById(ids);
    }
}注意:
@HystrixCollapser 修饰的方法必须返回 Future 对象,才能实现请求合并。
@HystrixCollapser 修饰的方法中的方法体不会被执行。
batchMethod 方法仅有一个参数,并且是 List 类型,List 的值为 @HystrixCollapser 修饰的方法的参数,如果 @HystrixCollapser 修饰的方法需要传递多个参数,则通过对象的方式进行传递。
batchMethod 方法必须通过 @HystrixCommand 注解修饰。
服务端方法需要支持批量操作,如 userServiceFeign.batchGetUserById()
通过笔者尝试,不能将请求合并方法放在 @RestController 修饰的类中,否则不会起作用。
(5)编写控制器,调用请求合并方法,代码如下:
package com.hxstrive.hystrix_demo.controller;
import com.hxstrive.hystrix_demo.entity.User;
import com.hxstrive.hystrix_demo.quest_collapser.QuestCollapserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Future;
/**
 * Hystrix 请求合并控制器
 * @author hxstrive.com
 */
@RestController
@RequestMapping("/demo9")
public class Demo9Controller {
    @Autowired
    private QuestCollapserService questCollapserService;
    /**
     * 在控制器中调用请求合并方法,根据用户ID获取用户信息
     */
    @GetMapping("/getUserById")
    public String getUserById() throws Exception {
        System.out.println("getUserById() 方法执行了..." + questCollapserService);
        // 这里调用了请求合并方法,
        // 为了模拟在 100 毫秒中调用多次,这里直接调用了三次
        Future<User> future1 = questCollapserService.collapserGetUserById(1L);
        Future<User> future2 = questCollapserService.collapserGetUserById(2L);
        Future<User> future3 = questCollapserService.collapserGetUserById(3L);
        // 分别获取值,这里一定要放在所有请求合并调用的后面
        // 因为 get 方法会阻塞
        System.out.println("future1: " + future1.get());
        System.out.println("future2: " + future2.get());
        System.out.println("future3: " + future3.get());
        return "success";
    }
}启动服务,如下图:

通过浏览器访问 http://localhost:8080/demo9/getUserId,效果如下图:

客户端输出日志:

服务端输出日志:

到这里一个简单的请求合并就完成了,比起非请求合并,代码确实要复杂一些。
