Eureka 客户端源码分析:服务注册、续约、下线

前面章节介绍了 Eureka Client 的启动过程,从 EnableDiscoveryClient 的注释中我们可以看到,它最终主要是用来开启 com.netflix.discovery.DiscoveryClient 的实例。源码如下:

/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
   //...
}

从 DiscoveryClient 类的注释可以看出,该类包含服务注册、服务续约、服务下线、获取服务等功能,下面将逐一进行源码分析。

服务注册

服务提供者在启动时会将自己的信息(比如IP地址、端口,运行状况指示符URL,主页等)注册到 Eureka Server,Eureka Server 收到信息后,会将数据信息存储在一个双层结构的 Map 中,其中第一层的key是服务名,第二层的key是具体服务的实例名。

下面代码通过发送 HTTP REST 注册请求到注册中心注册服务,源码如下:

/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
   logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
   EurekaHttpResponse<Void> httpResponse;
   try {
       // 主要的注册功能,实现在 RestTemplateEurekaHttpClient.register()
       httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
   } catch (Exception e) {
       logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
       throw e;
   }
   if (logger.isInfoEnabled()) {
       logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
   }
   return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

跟进进入 RestTemplateEurekaHttpClient.register() 方法,源码如下:

@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
   String urlPath = serviceUrl + "apps/" + info.getAppName();
   // 构造 HTTP 请求头
   HttpHeaders headers = new HttpHeaders();
   headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip");
   headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
   // 发起 HTTP 请求
   ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST, new HttpEntity<>(info, headers),
           Void.class);
   // 返回响应数据
   return anEurekaHttpResponse(response.getStatusCodeValue()).headers(headersOf(response)).build();
}

服务续约

在注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka Server 该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka Server 在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。建议不要更改续约间隔。

下面代码通过发送 HTTP REST 请求到注册中心进行服务续约,源码如下:

/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
   try {
       // 发送心跳请求
       // 实现为 RestTemplateEurekaHttpClient.sendHeartBeat()
       httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
       logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
       // 发送心跳请求
       if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
           REREGISTER_COUNTER.increment();
           logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
           long timestamp = instanceInfo.setIsDirtyWithTime();
           // 注册服务
           boolean success = register();
           if (success) {
               instanceInfo.unsetIsDirty(timestamp);
           }
           return success;
       }
       return httpResponse.getStatusCode() == Status.OK.getStatusCode();
   } catch (Throwable e) {
       logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
       return false;
   }
}

RestTemplateEurekaHttpClient.sendHeartBeat() 的代码如下:

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info,
       InstanceStatus overriddenStatus) {
   String urlPath = serviceUrl + "apps/" + appName + '/' + id + "?status=" + info.getStatus().toString()
           + "&lastDirtyTimestamp=" + info.getLastDirtyTimestamp().toString()
           + (overriddenStatus != null ? "&overriddenstatus=" + overriddenStatus.name() : "");
   // 发起HTTP请求
   ResponseEntity<InstanceInfo> response = restTemplate.exchange(urlPath, HttpMethod.PUT, null,
           InstanceInfo.class);
   // 解析响应
   EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(
           response.getStatusCodeValue(), InstanceInfo.class).headers(headersOf(response));

   if (response.hasBody()) {
       eurekaResponseBuilder.entity(response.getBody());
   }

   return eurekaResponseBuilder.build();
}

服务调用

服务调用本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、端口等。源代码如下:

@Override
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) {
   return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion());
}

@Override
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
                                                  @Nullable String region) {
   if (vipAddress == null) {
       throw new IllegalArgumentException(
               "Supplied VIP Address cannot be null");
   }
   Applications applications;
   // 判断服务提供方是否当前region,若是的话直接从localRegionApps中获取
   if (instanceRegionChecker.isLocalRegion(region)) {
       applications = this.localRegionApps.get();
   } else {
       // 否则的话从远程region获取
       applications = remoteRegionVsApps.get(region);
       if (null == applications) {
           logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
                   + "address {}.", region, vipAddress);
           return Collections.emptyList();
       }
   }

   // 从applications中获取服务名称对应的实例名称列表
   if (!secure) {
       return applications.getInstancesByVirtualHostName(vipAddress);
   } else {
       return applications.getInstancesBySecureVirtualHostName(vipAddress);
   }
}

服务下线

当服务实例正常关闭时,它会发送一个服务下线的消息给注册中心,注册中心收到信息后,会将该服务实例状态置为下线,并把该信息传播出去。该下线请求不会自动完成,它需要调用以下代码:

DiscoveryManager.getInstance().shutdownComponent()

DiscoveryClient 类通过 unregister() 方法实现服务下线,源码如下:

/**
* unregister w/ the eureka service.
*/
void unregister() {
   // It can be null if shouldRegisterWithEureka == false
   if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
       try {
           logger.info("Unregistering ...");
           // 发起 HTTP REST 请求,实现服务下线
           // 具体实现见 RestTemplateEurekaHttpClient.cancel() 方法
           EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
           logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
       } catch (Exception e) {
           logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
       }
   }
}

RestTemplateEurekaHttpClient.cancel() 源码如下:

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
   String urlPath = serviceUrl + "apps/" + appName + '/' + id;
   // 发起 HTTP 请求
   ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.DELETE, null, Void.class);
   return anEurekaHttpResponse(response.getStatusCodeValue()).headers(headersOf(response)).build();
}

通过以上分析可知,服务的注册、下线等操作实际上就是通过发送 HTTP 请求到注册中心来实现的。那么这些操作的执行时机是什么时候呢?是什么时候服务注册操作会被调用?下线操作是如何被触发的?请阅读下一章……

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号