响应时间加权策略根据服务实例的响应时间来分配请求权重。响应时间越长,权重越低,被选择到的概率就越低。响应时间越短,权重越高,被选择到的概率就越高。
该策略的核心思想是通过测量每个服务实例的响应时间,为响应时间较短的实例分配更高的权重,从而在后续的请求分配中更倾向于选择响应速度快的实例。
响应时间测量:在运行过程中,ResponseTimeWeightedRule 会持续监测每个服务实例的响应时间。当客户端向服务实例发送请求时,开始计时,当收到响应后,记录响应时间。
权重计算:根据记录的响应时间,计算每个服务实例的权重。通常,响应时间越短,权重越高。例如,可以使用反比例函数来计算权重,响应时间为 t 的实例权重可以设置为 1/t 或者其他类似的计算方式。
请求分配:在选择服务实例时,根据权重进行随机选择。权重高的实例被选中的概率更大,这样可以使得更多的请求被分配到响应速度快的实例上。
动态调整:随着服务实例的响应时间变化,权重会动态调整。如果一个实例的响应时间突然变长,它的权重会降低,后续请求分配到该实例的概率也会减少。反之,如果一个实例的响应时间变短,它的权重会增加,被选中的概率也会提高。
自适应:能够自动适应服务实例的性能变化,无需手动干预。在服务实例的性能波动较大的情况下,仍然可以有效地分配请求,提高系统的整体性能和响应速度。
在 application.properties 中,使用如下配置:
# 已过期,不推荐使用了 user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.ResponseTimeWeightedRule # 推荐使用 user.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.WeightedResponseTimeRule
将在名为 user 的客户端上开启响应时间加权策略。
Ribbon 响应时间加权策略通过 com.netflix.loadbalancer.ResponseTimeWeightedRule 和 com.netflix.loadbalancer.WeightedResponseTimeRule 类实现,前者已经废弃,推荐使用后者,下面将对两个类进行分析。
分析源码前,先看看它是如何计算权重的,如下图:

上图中,“总响应时间”指所有服务器平均响应时间的总和。权重计算公式“总响应时间 - 当前服务器的平均响应时间”,由此可以看出,响应时间越短,权重越高。
通过权重选择服务器的步骤如下:
(1)获取最大权重值。
(2)生成一个随机数,该随机数位于 0 ~ 最大权重值之间,不包含最大权重值。
(3)迭代所有服务器,逐一将服务器权重和随机数进行比较,如果当前服务器权重大于等于随机值,则选择该服务器。
接下来仔细看源码。

代码如下:
package com.netflix.loadbalancer;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** @deprecated */
public class ResponseTimeWeightedRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY;
public static final int DEFAULT_TIMER_INTERVAL = 30000;
private int serverWeightTaskTimerInterval = 30000;
private static final Logger logger;
private volatile List<Double> accumulatedWeights = new ArrayList();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public ResponseTimeWeightedRule() {
}
public ResponseTimeWeightedRule(ILoadBalancer lb) {
super(lb);
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
this.name = ((BaseLoadBalancer)lb).getName();
}
this.initialize(lb);
}
// 初始化
void initialize(ILoadBalancer lb) {
// 如果存在定时任务,则先取消
if (this.serverWeightTimer != null) {
this.serverWeightTimer.cancel();
}
// 创建定时任务
this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
this.serverWeightTimer.schedule(new DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
// 在定时任务出发前先计算一次权重
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
// 定义一个钩子,用于取消定时任务
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
ResponseTimeWeightedRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", ResponseTimeWeightedRule.this.name);
ResponseTimeWeightedRule.this.serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (this.serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-{}", this.name);
this.serverWeightTimer.cancel();
}
}
@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
} else {
Server server = null;
while(server == null) {
// 获取所有权重,权重和服务器列表的位置一一对应的
List<Double> currentWeights = this.accumulatedWeights;
// 如果线程已中断,直接返回
if (Thread.interrupted()) {
return null;
}
// 获取所有服务器
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
// 服务器为空,直接返回
return null;
}
int serverIndex = 0;
// ??? 没看懂
// 列表最后一个元素如何使最大权重值,List 是无须列表,有没有对其进行明显的排序操作
double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1);
if (maxTotalWeight < 0.001) {
server = super.choose(this.getLoadBalancer(), key);
} else {
// 获取一个在
double randomWeight = this.random.nextDouble() * maxTotalWeight;
// 遍历权重列表,确定服务器下标
int n = 0; // 选中的服务器下标
for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
Double d = (Double)var13.next();
if (d >= randomWeight) {
serverIndex = n;
break;
}
}
// 获取服务器信息
server = (Server)allList.get(serverIndex);
}
if (server == null) {
// 如果没有选中,则让出 CPU
Thread.yield();
} else {
if (server.isAlive()) {
// 选中了服务器,且存活,则直接返回
return server;
}
server = null;
}
}
return server;
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
}
static {
WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = WeightedResponseTimeRule.WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY;
logger = LoggerFactory.getLogger(ResponseTimeWeightedRule.class);
}
// 服务器权重,计算每个服务器的权重
class ServerWeight {
ServerWeight() {
}
public void maintainWeights() {
// 获取负载均衡器,维护了所有的服务器信息
ILoadBalancer lb = ResponseTimeWeightedRule.this.getLoadBalancer();
if (lb != null) {
// 设置服务权重计算开始标志,避免多线程同时计算
if (ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
try {
ResponseTimeWeightedRule.logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
// 获取服务器状态对象集
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats != null) {
// 临时保存所有服务器响应平均响应时间的总时间
double totalResponseTime = 0.0;
// 计算总时间
ServerStats ss;
for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
Server server = (Server)var6.next();
ss = stats.getSingleServerStat(server);
}
// 计算单个服务器权重
Double weightSoFar = 0.0;
// 计算出来的所有服务器权重
List<Double> finalWeights = new ArrayList();
// 迭代所有服务器
Iterator var20 = nlb.getAllServers().iterator();
while(var20.hasNext()) {
Server serverx = (Server)var20.next();
ServerStats ssx = stats.getSingleServerStat(serverx);
// 总时间减去当前服务器平均响应时间,的到权重值
// 如果服务器平均响应时间越小,得到的 weight 值就越大
// 见 choose 方法中的使用
double weight = totalResponseTime - ssx.getResponseTimeAvg();
weightSoFar = weightSoFar + weight;
finalWeights.add(weightSoFar);
}
ResponseTimeWeightedRule.this.setWeights(finalWeights);
return;
}
} catch (Exception var16) {
ResponseTimeWeightedRule.logger.error("Error calculating server weights", var16);
return;
} finally {
// 服务器权重计算完后,将标志恢复
ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.set(false);
}
}
}
}
}
// 权重更新定时任务,计算每个服务器的权重
class DynamicServerWeightTask extends TimerTask {
DynamicServerWeightTask() {
}
public void run() {
ServerWeight serverWeight = ResponseTimeWeightedRule.this.new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception var3) {
ResponseTimeWeightedRule.logger.error("Error running DynamicServerWeightTask for {}", ResponseTimeWeightedRule.this.name, var3);
}
}
}
}
代码如下:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.netflix.loadbalancer;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WeightedResponseTimeRule extends RoundRobinRule {
// 成员变量也没有变化
// 和 ResponseTimeWeightedRule 一致
public WeightedResponseTimeRule() {}
public WeightedResponseTimeRule(ILoadBalancer lb) {}
public void setLoadBalancer(ILoadBalancer lb) {}
// 和 ResponseTimeWeightedRule 一致
void initialize(ILoadBalancer lb) {}
// 和 ResponseTimeWeightedRule 一致
public void shutdown() {}
// 新增,返回权重列表的一个不可修改副本列表
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(this.accumulatedWeights);
}
@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
} else {
Server server = null;
while(server == null) {
List<Double> currentWeights = this.accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
double maxTotalWeight = currentWeights.size() == 0 ? 0.0 : (Double)currentWeights.get(currentWeights.size() - 1);
// 这里条件变了
if (!(maxTotalWeight < 0.001) && serverCount == currentWeights.size()) {
// 计算权重的方式一致
double randomWeight = this.random.nextDouble() * maxTotalWeight;
int n = 0;
for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
Double d = (Double)var13.next();
if (d >= randomWeight) {
serverIndex = n;
break;
}
}
server = (Server)allList.get(serverIndex);
} else {
server = super.choose(this.getLoadBalancer(), key);
if (server == null) {
return server;
}
}
if (server == null) {
Thread.yield();
} else {
if (server.isAlive()) {
return server;
}
server = null;
}
}
return server;
}
}
// 和 ResponseTimeWeightedRule 一致
void setWeights(List<Double> weights) {}
public void initWithNiwsConfig(IClientConfig clientConfig) {}
class ServerWeight {}
class DynamicServerWeightTask extends TimerTask {}
}通过上面分析,对比 ResponseTimeWeightedRule 和 WeightedResponseTimeRule 的代码,几乎一样,没有太多的变化。
提高性能:通过优先选择响应速度快的服务实例,可以减少请求的平均响应时间,提高系统的性能。
负载均衡优化:相比其他简单的负载均衡策略,如轮询或随机选择,ResponseTimeWeightedRule能够更好地根据服务实例的实际性能进行请求分配,实现更优化的负载均衡。
初始阶段不稳定:在系统刚开始运行或者新的服务实例加入时,由于没有足够的响应时间数据,权重计算可能不准确,导致请求分配不够优化。
对异常值敏感:如果某个服务实例出现偶尔的异常高响应时间,可能会对其权重产生较大影响,导致在一段时间内被分配的请求较少。
对响应时间要求较高的系统:如实时交易系统、在线游戏等,需要快速响应的场景下,响应时间加权策略可以确保请求被分配到性能较好的服务实例上,提高用户体验。
服务实例性能差异较大的环境:当不同的服务实例在硬件配置、网络环境等方面存在较大差异时,该策略可以根据实际性能进行请求分配,充分发挥性能较好的实例的作用。