Spring Cloud LoadBalancer 教程

LoadBalancer 执行流程解析

本章将以“Spring Cloud LoadBalancer 快速上手:入门案例”为基础,解析 RestTemplate 结合 @LoadBalanced 注解实现负载均衡调用的执行流程。了解整个执行流程有助于我们更好的理解 Spring Cloud LoadBalancer,为后续深入学习它打下基础。

整体流程如下:

  1. 使用浏览器访问 http://localhost:8080/getData 地址,请求会被 LoadBalancerInterceptor 拦截。调用 intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) 方法,识别出 URL 中的服务名(如 USER-SERVICE),而非具体“IP: 端口”。如下图:

Spring Cloud LoadBalancer 执行流程解析

  1. 解析到服务名后继续调用 LoadBalancerClient(实际类型是 BlockingLoadBalancerClient)的 execute(String serviceId, LoadBalancerRequest<T> request) 方法,根据服务 ID 执行负载均衡请求。方法内部会自动选择合适的服务实例,并将请求转发到该实例。方法定义如下:

/**
 * @param serviceId 服务名称(用于从服务发现中获取实例列表)
 * @param request   封装了实际请求逻辑的负载均衡请求对象
 * @param <T>       请求返回值类型
 * @return 请求执行的结果
 * @throws IOException 当请求执行过程中发生I/O错误时抛出
 */
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
  1. 进入到 BlockingLoadBalancerClient 的 execute() 方法,根据服务ID自动选择实例并执行请求,包含负载均衡生命周期管理、实例选择及请求转发逻辑,代码如下:

public class BlockingLoadBalancerClient implements LoadBalancerClient {
    // 负载均衡器工厂,用于根据服务ID获取对应的响应式负载均衡器
    private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

    //...(其他属性和方法省略)

    /**
     * 根据服务ID自动选择实例并执行请求
     *
     * @param serviceId 服务名称(用于定位服务实例)
     * @param request   封装了实际请求逻辑的负载均衡请求对象
     * @param <T>       请求返回值类型
     * @return 请求执行结果
     * @throws IOException 当请求执行过程中发生I/O错误时抛出
     * @throws IllegalStateException 当没有可用服务实例时抛出
     */
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        // 1. 获取服务的负载均衡提示信息(可用于自定义实例筛选逻辑)
        String hint = this.getHint(serviceId);
        
        // 2. 适配请求对象,包装原始请求和上下文信息(如超时上下文)
        LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = 
            new LoadBalancerRequestAdapter(request, this.buildRequestContext(request, hint));
        
        // 3. 获取当前服务支持的负载均衡生命周期处理器(用于触发请求生命周期事件)
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
        
        // 4. 触发所有生命周期处理器的"请求开始"事件(如记录 metrics、日志等)
        supportedLifecycleProcessors.forEach((lifecycle) -> {
            lifecycle.onStart(lbRequest);
        });
        
        // 5. 核心:通过负载均衡策略选择一个可用的服务实例
        ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);
        
        // 6. 处理无可用实例的情况
        if (serviceInstance == null) {
            // 触发"请求完成(失败)"事件,标记状态为丢弃
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse()));
            });
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            // 7. 若实例存在,使用选中的实例执行请求
            return this.execute(serviceId, serviceInstance, lbRequest);
        }
    }

    //...(其他方法省略)
}

关键代码 this.choose(serviceId, lbRequest) 解析如下:

/**
 * 根据服务ID和请求信息选择合适的服务实例(负载均衡核心逻辑的实例选择步骤)
 * 
 * @param serviceId 服务名称,用于定位对应的负载均衡器和服务实例列表
 * @param request 包含请求上下文的对象,可能影响实例选择策略(如携带元数据用于筛选)
 * @param <T> 请求的泛型类型
 * @return 选中的服务实例,若无可用实例则返回null
 */
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    // 1. 根据服务ID从工厂获取对应的响应式负载均衡器
    // 负载均衡器与服务一一对应,包含该服务的实例列表和选择策略
    ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
    
    // 2. 若未获取到负载均衡器(通常因服务未注册),直接返回null
    if (loadBalancer == null) {
        return null;
    } else {
        // 3. 调用负载均衡器的choose方法选择实例(响应式操作)
        // 通过Mono.from()包装响应式流,block()阻塞获取结果(适配同步场景)
        Response<ServiceInstance> loadBalancerResponse = 
            (Response)Mono.from(loadBalancer.choose(request)).block();
        
        // 4. 从响应中提取选中的服务实例并返回(无实例时返回null)
        return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
    }
}

注意:loadBalancer.choose(request) 方法有具体的负载均衡算法去实现,如 RandomLoadBalancer、RoundRobinLoadBalancer 等。

  1. 成功选择到可用实例后,调用实例的 execute() 方法去执行请求:

this.execute(serviceId, serviceInstance, lbRequest);

下图是 serviceInstance 实例的信息截图,这里的 execute() 将利用这些信息发起实际请求:

Spring Cloud LoadBalancer 执行流程解析

方法 execte() 的代码如下:

/**
 * 使用指定的服务实例执行负载均衡请求的核心方法
 * 负责触发请求生命周期事件、执行实际请求逻辑,并处理请求结果/异常
 *
 * @param serviceId        服务名称(用于定位对应的负载均衡生命周期处理器)
 * @param serviceInstance  已选定的具体服务实例(包含IP、端口等可直接调用的信息)
 * @param request          封装了实际业务请求逻辑的负载均衡请求对象
 * @param <T>              请求返回值的泛型类型
 * @return 业务请求执行后的返回结果
 * @throws IOException      当业务请求执行过程中发生I/O错误时抛出
 * @throws IllegalArgumentException 当传入的服务实例为null时抛出
 */
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
    // 1. 校验服务实例合法性:若实例为null,直接抛出参数非法异常
    if (serviceInstance == null) {
        throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);
    } else {
        // 2. 封装服务实例为负载均衡标准响应对象(便于生命周期处理器使用)
        DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
        // 3. 获取当前服务支持的负载均衡生命周期处理器(用于事件回调,如metrics统计、日志记录)
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
        // 4. 适配请求对象:若传入的request已实现Request接口则直接使用,否则包装为DefaultRequest
        Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest();
        // 5. 触发所有生命周期处理器的"请求开始"事件(通知请求即将发送到指定实例)
        supportedLifecycleProcessors.forEach((lifecycle) -> {
            lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance));
        });

        try {
            // 6. 核心逻辑:执行实际业务请求(调用request的apply方法,传入服务实例)
            // 【后续分析】
            T response = request.apply(serviceInstance);
            // 7. 获取客户端响应结果(适配不同请求类型的响应格式,如RestTemplate、WebClient的响应)
            Object clientResponse = this.getClientResponse(response);
            // 8. 触发所有生命周期处理器的"请求完成(成功)"事件(携带成功状态和响应数据)
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.SUCCESS, lbRequest, defaultResponse, clientResponse));
            });
            // 9. 返回业务请求的执行结果
            return response;
        } catch (IOException var9) {
            // 10. 捕获I/O异常(如网络连接失败):触发"请求完成(失败)"事件,携带异常信息
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.FAILED, var9, lbRequest, defaultResponse));
            });
            // 11. 重新抛出I/O异常,交由上层处理
            throw var9;
        } catch (Exception var10) {
            // 12. 捕获其他非I/O异常(如业务异常):触发"请求完成(失败)"事件
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.FAILED, var10, lbRequest, defaultResponse));
            });
            // 13. 将非运行时异常包装为运行时异常抛出(避免方法签名增加过多异常声明)
            ReflectionUtils.rethrowRuntimeException(var10);
            // 14. 逻辑兜底:因上一行已抛出异常,此return实际不会执行
            return null;
        }
    }
}
  1. 调用 Request 的 apply() 方法(实际为 LoadBalancerRequestAdapter 对象),对指定的服务实例(ServiceInstance)执行实际的业务请求。代码如下:

/**
 * 负载均衡请求适配器,实现了 LoadBalancerRequest 接口并继承 DefaultRequest,
 * 用于适配原始负载均衡请求(LoadBalancerRequest)与请求上下文(RC),
 * 同时支持请求的委托执行。
 *
 * @param <T>  请求执行后的返回类型
 * @param <RC> 请求上下文类型,用于携带额外的请求元数据或配置信息
 */
public class LoadBalancerRequestAdapter<T, RC> extends DefaultRequest<RC> implements LoadBalancerRequest<T> {

    /**
     * 被委托的原始负载均衡请求对象,实际的请求逻辑由该对象实现
     */
    private final LoadBalancerRequest<T> delegate;

    //...

    /**
     * 执行负载均衡请求,委托给原始请求对象处理
     * 实现了 LoadBalancerRequest 接口的核心方法,实际逻辑由被委托的 delegate 执行
     *
     * @param instance 经过负载均衡选择的服务实例
     * @return 请求执行结果,类型为T
     * @throws Exception 当请求执行过程中发生异常时抛出
     */
    public T apply(ServiceInstance instance) throws Exception {
        return this.delegate.apply(instance);
    }
}

继续跟踪 delegate 实例(实际类型为 BlockingLoadBalancerRequest)的 apply() 方法,如下:

/**
 * 执行针对选中服务实例的HTTP请求,处理请求转换并执行实际调用
 * 实现了对原始HTTP请求的包装与转换,确保请求正确发送到选中的服务实例
 * 
 * @param instance 经过负载均衡选择的目标服务实例(包含IP、端口等信息)
 * @return 服务实例返回的HTTP响应对象(ClientHttpResponse)
 * @throws Exception 当请求转换或执行过程中发生异常时抛出
 */
public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
    // 1. 包装原始HTTP请求
    HttpRequest serviceRequest = new ServiceRequestWrapper(
        this.clientHttpRequestData.request,  // 原始HTTP请求数据
        instance,                            // 选中的服务实例
        this.loadBalancer                    // 负载均衡器(用于URI重构)
    );

    // 2. 应用请求转换器(若有):对请求进行额外处理(如添加请求头、修改参数等)
    LoadBalancerRequestTransformer transformer;
    if (this.transformers != null) {
        // 遍历所有转换器,依次对请求进行转换
        for (Iterator var3 = this.transformers.iterator(); var3.hasNext(); 
             // 每次转换后更新请求对象,将转换结果作为下一次转换的输入
             serviceRequest = transformer.transformRequest((HttpRequest) serviceRequest, instance)) {
            transformer = (LoadBalancerRequestTransformer) var3.next();
        }
    }

    // 3. 执行转换后的HTTP请求:通过原始请求执行器发送请求并返回响应
    // clientHttpRequestData.execution 为原始请求执行器(如ClientHttpRequestExecution)
    return this.clientHttpRequestData.execution.execute(
        (HttpRequest) serviceRequest,  // 转换后的HTTP请求
        this.clientHttpRequestData.body // 请求体数据
    );
}

继续跟进 execute() 方法,代码如下:

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    // 请求工厂,用于创建实际发送请求的ClientHttpRequest实例
    private final ClientHttpRequestFactory requestFactory;
    // 请求拦截器列表,用于在请求发送前/后执行拦截逻辑(如负载均衡、日志记录等)
    private final List<ClientHttpRequestInterceptor> interceptors;
    // HTTP请求方法(如GET、POST等)
    private final HttpMethod method;
    // 请求的目标URI
    private final URI uri;

    //...(其他构造方法和属性省略)

    /**
     * 拦截式请求执行器,负责按顺序执行拦截器链,并在最后执行实际请求
     * 实现了责任链模式,依次调用每个拦截器的intercept方法
     */
    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
        // 拦截器迭代器,用于遍历拦截器列表
        private final Iterator<ClientHttpRequestInterceptor> iterator;

        //...

        /**
         * 执行请求逻辑:依次调用拦截器,最后发送实际请求
         *
         * @param request 当前HTTP请求对象
         * @param body    请求体字节数组
         * @return 服务器返回的HTTP响应
         * @throws IOException 当请求发送过程中发生I/O错误时抛出
         */
        public ClientHttpResponse execute(HttpRequest request, final byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                // 1. 若存在下一个拦截器,调用其intercept方法(传递当前执行器作为参数,支持链式调用)
                ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor) this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            } else {
                // 2. 所有拦截器执行完毕,执行实际请求发送逻辑

                // 2.1 获取请求方法并通过请求工厂创建委托请求对象(实际发送请求的实例)
                // 【后续分析】 request.getURI() 获取服务地址
                HttpMethod method = request.getMethod();
                ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);

                // 2.2 复制原始请求的头信息到委托请求
                request.getHeaders().forEach((key, value) -> {
                    delegate.getHeaders().addAll(key, value);
                });

                // 2.3 复制原始请求的属性到委托请求
                request.getAttributes().forEach((key, value) -> {
                    delegate.getAttributes().put(key, value);
                });

                // 2.4 处理请求体(若存在)
                if (body.length > 0) {
                    // 调整内容长度头(若原始长度与实际body长度不一致)
                    long contentLength = delegate.getHeaders().getContentLength();
                    if (contentLength > -1L && contentLength != (long) body.length) {
                        delegate.getHeaders().setContentLength((long) body.length);
                    }

                    // 将body写入委托请求:支持流式输出和普通输出两种方式
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(new StreamingHttpOutputMessage.Body() {
                            // 写入输出流
                            public void writeTo(OutputStream outputStream) throws IOException {
                                StreamUtils.copy(body, outputStream);
                            }

                            // 标记为可重复(支持重试场景)
                            public boolean repeatable() {
                                return true;
                            }
                        });
                    } else {
                        // 直接复制body到请求体输出流
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }

                // 2.5 执行委托请求,发送到服务器并返回响应
                return delegate.execute();
            }
        }
    }
}

上面代码中,request.getURI() 语句用来获取实际的调用地址,即将服务 ID 替换成实际 IP 和端口,代码如下:

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    //...

    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());
        return uri;
    }
}

继续跟进:

public class BlockingLoadBalancerClient implements LoadBalancerClient {
    //...

    public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
        return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
    }

    //...
}

此时,发现 reconstructURI() 方法调用了 LoadBalancerUriTools 工具类的 reconstructURI() 方法,代码如下:

public static URI reconstructURI(ServiceInstance serviceInstance, URI original) {
    if (serviceInstance == null) {
        throw new IllegalArgumentException("Service Instance cannot be null.");
    } else {
        return doReconstructURI(serviceInstance, original);
    }
}

/**
 * 执行URI重构的核心方法:将原始URI中的服务名替换为服务实例的实际主机、端口和协议
 * 确保请求最终指向具体服务实例的网络地址
 *
 * @param serviceInstance 经过负载均衡选择的服务实例(包含实际主机、端口等信息)
 * @param original 原始请求URI(通常包含服务名,如 http://user-service/api)
 * @return 重构后的URI(指向具体服务实例,如 http://192.168.1.100:8081/api)
 */
private static URI doReconstructURI(ServiceInstance serviceInstance, URI original) {
    // 1. 获取服务实例的实际主机名(IP地址或域名)
    String host = serviceInstance.getHost();
    
    // 2. 确定协议方案(如http、https):
    // 优先使用服务实例元数据中指定的scheme
    // 若未指定,通过原始URI和服务实例信息计算(如根据端口推断)
    String scheme = (String) Optional.ofNullable(serviceInstance.getScheme())
        .orElse(computeScheme(original, serviceInstance));
    
    // 3. 确定端口号:
    // 基于服务实例的端口和协议方案计算(如https默认443,http默认80)
    int port = computePort(serviceInstance.getPort(), scheme);
    
    // 4. 优化判断:若重构后的主机、端口、协议与原始URI一致,直接返回原始URI(避免不必要的重建)
    if (Objects.equals(host, original.getHost()) 
        && port == original.getPort() 
        && Objects.equals(scheme, original.getScheme())) {
        return original;
    } else {
        // 5. 重建URI:
        //  标记原始URI是否包含已编码的部分(如%20表示空格),避免重复编码
        boolean encoded = containsEncodedParts(original);
        
        // 使用UriComponentsBuilder构建新URI:保留原始路径、查询参数等,仅替换scheme、host、port
        return UriComponentsBuilder.fromUri(original)
            .scheme(scheme)    // 设置协议
            .host(host)        // 设置实际主机
            .port(port)        // 设置实际端口
            .build(encoded)    // 构建时保留原始编码状态
            .toUri();          // 转换为URI对象并返回
    }
}

到这里,Spring Cloud LoadBalancer 执行的整个过程就分析完了。

注意:上面源码均是通过 IDEA 进行反编译进行分析,如果存在问题,请评论告知。

 

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号