前面章节介绍了 Netflix Ribbon 的简单实例,本章节将根据上章节的实例来分析 Netflix Ribbon 执行流程。先看看 Demo 的 Java 代码:
package com.hxstrive.springcloud.ribbon_demo1;
import com.netflix.client.ClientFactory;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
import com.netflix.config.ConfigurationManager;
import com.netflix.niws.client.http.RestClient;
public class Demo1 {
public static void main(String[] args) throws Exception{
// 1.加载配置信息
ConfigurationManager.loadPropertiesFromResources("application.properties");
System.out.println(ConfigurationManager.getConfigInstance().getProperty("user.ribbon.listOfServers"));
// 2.返回名称为 user 的 RestClient 客户端
// 注意:这里的名称是在 application.properties 文件中配置 ribbon 时的前缀
// user.ribbon.listOfServers=localhost:7001,localhost:7002,localhost:7003
// 上面配置中的 user 就是客户端名称
RestClient client = (RestClient) ClientFactory.getNamedClient("user");
// 3.构建指定 URL 的 HTTP 请求
HttpRequest request = HttpRequest.newBuilder().uri("/info").build();
for (int i = 0; i < 10; i++) {
// 4.使用负载均衡算法发起 HTTP 请求
HttpResponse response = client.executeWithLoadBalancer(request);
// 打印调用状态和结果
System.out.println("Status code for " + response.getRequestedURI() + " status:"
+ response.getStatus() + " entity: " + response.getEntity(String.class));
}
}
}ConfigurationManager 主要用来做配置管理,loadPropertiesFromResources() 方法用于加载配置文件,并把配置信息写入 ConcurrentMapConfiguration,为后续的请求做准备。源码如下:
/**
* Load properties from resource file(s) into the system wide configuration
* @param path relative path of the resources
* @throws IOException
*/
public static void loadPropertiesFromResources(String path)
throws IOException {
if (instance == null) {
instance = getConfigInstance();
}
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Enumeration<URL> resources = loader.getResources(path);
if (!resources.hasMoreElements()) {
//non-existent config path. Throw an exception. Issue #150
throw new IOException("Cannot locate " + path + " as a classpath resource.");
}
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
InputStream fin = url.openStream();
Properties props = ConfigurationUtils.loadPropertiesFromInputStream(fin);
// instance 类型为 ConcurrentCompositeConfiguration
if (instance instanceof AggregatedConfiguration) {
// 这里被执行
String name = getConfigName(url);
// 解析配置
ConcurrentMapConfiguration config = new ConcurrentMapConfiguration();
config.loadProperties(props);
// 将配置添加到配置列表中
((AggregatedConfiguration) instance).addConfiguration(config, name);
} else {
ConfigurationUtils.loadProperties(props, instance);
}
}
}因为 instance 类型为 ConcurrentCompositeConfiguration,而 ConcurrentCompositeConfiguration 类又实现了 AggregatedConfiguration 接口,因此将执行 if(){} 语句,因为在 properties 中设置 user.ribbon.listOfServers=localhost:7001, localhost:7002, localhost:7003,那么会把 key=user.ribbon.listOfServers,value=localhost:7001, localhost:7002, localhost:7003 的配置项添加到ConcurrentMapConfiguration 的 map,其他配置项类似。
ConcurrentCompositeConfiguration.addConfiguration() 方法代码如下:
public class ConcurrentCompositeConfiguration extends ConcurrentMapConfiguration
implements AggregatedConfiguration, ConfigurationListener, Cloneable {
//...
public void addConfiguration(AbstractConfiguration config, String name) {
if (containerConfigurationChanged) {
// 执行这里
addConfigurationAtIndex(config, name, configList.size());
} else {
addConfigurationAtIndex(config, name, configList.indexOf(containerConfiguration));
}
}
/**
* Add a configuration with a name at a particular index.
*
* @throws IndexOutOfBoundsException
*/
public void addConfigurationAtIndex(AbstractConfiguration config, String name, int index)
throws IndexOutOfBoundsException {
if (!configList.contains(config)) {
checkIndex(index);
// 将配置信息添加到配置列表
// private List<AbstractConfiguration> configList = new CopyOnWriteArrayList<AbstractConfiguration>();
// 见调试图
configList.add(index, config);
if (name != null) {
namedConfigurations.put(name, config);
}
config.addConfigurationListener(eventPropagater);
fireEvent(EVENT_CONFIGURATION_SOURCE_CHANGED, null, null, false);
} else {
logger.warn(config + " is not added as it already exits");
}
}
//...
}configList 变量用来存放所有的配置信息,如下图:

上图中,configList 变量包含了 6 种配置信息,分别是:
(1)DynamicURLConfiguration:默认配置
(2)SystemConfiguration:Java环境变量,例如:awt.toolkit、file.separator、java.version、java.ext.dirs 等等
(3)EnvironmentConfiguration:系统环境变量,例如:JAVA_HOM、PATH、OS、HOMEPATH 等等
(4)ConcurrentMapConfiguration:应用程序配置
(5)ConcurrentCompositeConfiguration:
(6)ConcurrentMapConfiguration:这个就是我们自己的配置信息,即 application.properties
上面实例中,使用如下语句获取 RestClient:
RestClient client = (RestClient) ClientFactory.getNamedClient("user");上面代码通过 client name(这里是 user)来获取 RestClient 对象。源码如下:
/**
* Return the named client from map if already created. Otherwise creates the client using the configuration returned by {@link #getNamedConfig(String)}.
* 从map返回已命名的客户端(如果已创建)。否则,使用 getNamedConfig(String) 返回的配置创建客户端。
* @throws RuntimeException if an error occurs in creating the client.
*/
public static synchronized IClient getNamedClient(String name) {
return getNamedClient(name, DefaultClientConfigImpl.class);
}
/**
* Return the named client from map if already created. Otherwise creates the client using the configuration returned by {@link #createNamedClient(String, Class)}.
* 从 map 返回已命名的客户端(如果已创建)。否则,使用 createNamedClient(String,Class) 返回的配置创建客户端。
* @throws RuntimeException if an error occurs in creating the client.
*/
public static synchronized IClient getNamedClient(String name, Class<? extends IClientConfig> configClass) {
if (simpleClientMap.get(name) != null) {
return simpleClientMap.get(name);
}
try {
return createNamedClient(name, configClass);
} catch (ClientException e) {
throw new RuntimeException("Unable to create client", e);
}
}
/**
* Creates a named client using a IClientConfig instance created off the configClass class object passed in as the parameter.
* 使用参数 configClass 类对象创建的 IClientConfig 实例创建命名客户端。
* @throws ClientException if any error occurs, or if the client with the same name already exists
*/
public static synchronized IClient createNamedClient(String name, Class<? extends IClientConfig> configClass) throws ClientException {
IClientConfig config = getNamedConfig(name, configClass);
return registerClientFromProperties(name, config);
}上面代码中,使用 getNamedConfig(name, configClass) 方法去创建 IClientConfig 对象,代码如下:
/**
* Get the client configuration given the name or create one with clientConfigClass if it does not exist. An instance of IClientConfig
* is created and {@link IClientConfig#loadProperties(String)} will be called.
*/
public static IClientConfig getNamedConfig(String name, Class<? extends IClientConfig> clientConfigClass) {
// 根据指定的名称获取 IClientConfig,明显我们指定的客户端还没有被创建
IClientConfig config = namedConfig.get(name);
if (config != null) {
return config;
} else {
try {
// 通过反射的方式创建 IClientConfig 配置对象
config = (IClientConfig) clientConfigClass.newInstance();
// 加载 name(当前为 user)为前缀的配置,然后替换默认配置
// 注意:在 loadProperties() 方法中,会先调用 loadDefaultValues() 方法去加载默认配置
config.loadProperties(name);
} catch (InstantiationException | IllegalAccessException e) {
logger.error("Unable to create named client config '{}' instance for config class {}", name,
clientConfigClass, e);
return null;
}
config.loadProperties(name);
IClientConfig old = namedConfig.putIfAbsent(name, config);
if (old != null) {
config = old;
}
return config;
}
}最后,通过调用 ClientFactory 类的 registerClientFromProperties() 方法创建 IClient,代码如下:
/**
* Utility method to create client and load balancer (if enabled in client config) given the name and client config.
* Instances are created using reflection (see {@link #instantiateInstanceWithClientConfig(String, IClientConfig)}
*
* @param restClientName
* @param clientConfig
* @throws ClientException if any errors occurs in the process, or if the client with the same name already exists
*/
public static synchronized IClient<?, ?> registerClientFromProperties(String restClientName, IClientConfig clientConfig) throws ClientException {
IClient<?, ?> client = null;
ILoadBalancer loadBalancer = null;
if (simpleClientMap.get(restClientName) != null) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"A Rest Client with this name is already registered. Please use a different name");
}
try {
String clientClassName = (String) clientConfig.getProperty(CommonClientConfigKey.ClientClassName);
client = (IClient<?, ?>) instantiateInstanceWithClientConfig(clientClassName, clientConfig);
boolean initializeNFLoadBalancer = Boolean.parseBoolean(clientConfig.getProperty(
CommonClientConfigKey.InitializeNFLoadBalancer, DefaultClientConfigImpl.DEFAULT_ENABLE_LOADBALANCER).toString());
if (initializeNFLoadBalancer) {
loadBalancer = registerNamedLoadBalancerFromclientConfig(restClientName, clientConfig);
}
if (client instanceof AbstractLoadBalancerAwareClient) {
((AbstractLoadBalancerAwareClient) client).setLoadBalancer(loadBalancer);
}
} catch (Throwable e) {
String message = "Unable to InitializeAndAssociateNFLoadBalancer set for RestClient:"
+ restClientName;
logger.warn(message, e);
throw new ClientException(ClientException.ErrorType.CONFIGURATION,
message, e);
}
simpleClientMap.put(restClientName, client);
Monitors.registerObject("Client_" + restClientName, client);
logger.info("Client Registered:" + client.toString());
return client;
}在实例中,使用如下代码创建一个 HTTP 请求,代码如下:
HttpRequest request = HttpRequest.newBuilder().uri("/info").build();该 HTTP 请求的地址为 “/info”。HttpRequest 是 ribbon-httpclient 提供的 HTTP 通信工具,它将作为 RestClient 发起 HTTP 请求的参数。
在实例中,使用如下代码发起一个 HTTP 请求,代码如下:
HttpResponse response = client.executeWithLoadBalancer(request);
当调用者希望将请求分派给负载均衡器选择的服务器时,应使用 executeWithLoadBalancer() 方法,而不是在请求的 URI 中指定服务器。它通过调用 reconstructURIWithServer(com.netflix.loadbalancer.Server, java.net.URI),然后调用executeWithLoadBalancer(ClientRequest, com.netflix.client.config.IClientConfig) 来计算最终的 URI。
源码如下:
/**
* This method should be used when the caller wants to dispatch the request to a server chosen by
* the load balancer, instead of specifying the server in the request's URI.
* It calculates the final URI by calling {@link #reconstructURIWithServer(com.netflix.loadbalancer.Server, java.net.URI)}
* and then calls {@link #executeWithLoadBalancer(ClientRequest, com.netflix.client.config.IClientConfig)}.
*
* @param request request to be dispatched to a server chosen by the load balancer. The URI can be a partial
* URI which does not contain the host name or the protocol.
*/
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 构建 LoadBalancerCommand
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 创建一个 Observable 对象,一旦订阅了该 Observable,它将与负载均衡器选择的服务器异步执行网络调用。
// 如果 RetryHandler 指示可重试任何错误,则该错误将由函数在内部使用,并且所订阅的 Observer 将不会观
// 察到这些错误返回的 Observable。
// 如果重试次数超过了允许的最大值,则返回的 Observable 将发出最终错误。
// 否则,将发出执行和重试期间的第一个成功结果。
return command.submit(
// 为指定的服务提供 rx.Observable
// 使用 com.netflix.loadbalancer.reactive.LoadBalancerCommand
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 发起请求
// Observable.just() 方法来自 rxjava 框架,它将返回一个 Observable 对象
// rxjava 是一个异步框架,功能和 handler 类似,特点是链式调用,逻辑简单
// execute() 方法执行请求并返回响应。 不会重试,并且直接抛出所有异常。
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
// 创建 RetryHandler
// RetryHandler 是一种处理程序,用于确定异常是否可供负载平衡器检索,以及异常或错误响应是否应被
// 视为与电路相关的故障,以便负载平衡器可以避免此类服务器。
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
// 创建负载均衡器命令 Builder
LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri());
// 默认情况下什么都不做,子类可以实现它实现自定义 builder
customizeLoadBalancerCommandBuilder(request, config, builder);
return builder.build();
}
protected void customizeLoadBalancerCommandBuilder(final S request, final IClientConfig config,
final LoadBalancerCommand.Builder<T> builder) {
// do nothing by default, give a chance to its derived class to customize the builder
}注意,LoadBalancerCommand 是一个命令,用于从负载均衡器的执行中生成 Observable 对象。负载均衡器主要负责以下工作:
选择一个服务器
调用 call(com.netflix.loadbalancer.Server) 方法
如果有的话,调用 ExecutionListener
重试异常,由 com.netflix.client.RetryHandler 控制
向 com.netflix.loadbalancer.LoadBalancerStats 提供反馈
实例最后则是解析调用服务后返回的 HttpResponse 对象。