Eureka 服务端源码分析:服务续约实现

前面章节分析了 Eureka 服务注册的源码,本章将接着分析 Eureka 服务续约的源码实现。

在服务注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka 服务端该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka 服务端在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。

注意:建议不要轻易更改续约间隔。

先找到 AbstractInstanceRegistry 类,在 renew() 方法中打一个断点,然后分别开启 Eureka 服务端和客户端,等待片刻自动进入调试模式,调用栈信息如下图:

Eureka 服务端源码分析:服务续约实现

从上图得知,客户端请求由 InstanceResource 的 renewLease() 方法来接收,然后将请求交给 InstanceRegistry 的 renew() 方法,InstanceRegistry 又将调用 PeerAwareInstanceRegistryImpl 类的 renew() 方法,PeerAwareInstanceRegistryImpl 将继续调用 AbstractInstanceRegistry 类的 renew() 方法。

InstanceRegistryPeerAwareInstanceRegistryImpl AbstractInstanceRegistry 类的关系如下图:

Eureka 服务端源码分析:服务续约实现

下面将从源码的角度逐一分析各个类中的 renew() 方法实现。

InstanceResource

该类将提供一个 PUT 类型的 HTTP 接口供 Eureka 客户端调用,通过该接口实现服务续约。该方法实现源码如下:

@PUT
public Response renewLease(
       @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
       @QueryParam("overriddenstatus") String overriddenStatus,
       @QueryParam("status") String status,
       @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
   boolean isFromReplicaNode = "true".equals(isReplication);
   // 服务续约(registry 为 InstanceRegistry 类型)
   boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

   // Not found in the registry, immediately ask for a register
   if (!isSuccess) {
       logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
       return Response.status(Status.NOT_FOUND).build();
   }
   // Check if we need to sync based on dirty time stamp, the client
   // instance might have changed some value
   Response response;
   if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
       response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
       // Store the overridden status since the validation found out the node that replicates wins
       if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
               && (overriddenStatus != null)
               && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
               && isFromReplicaNode) {
           registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
       }
   } else {
       response = Response.ok().build();
   }
   logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
   return response;
}

InstanceRegistry

该类实现了 InstanceRegistry 接口。InstanceRegistry 接口继承了 LookupService 、LeaseManager 接口,提供应用实例的注册与发现服务。另外,它结合实际业务场景,定义了更加丰富的接口方法。renew() 方法源码如下:

@Override
public boolean renew(final String appName, final String serverId, boolean isReplication) {
  log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
  // 从注册表中按 Application.getName() 的排序,获取所有 Applications 的列表。
  List<Application> applications = getSortedApplications();
  for (Application input : applications) {
     if (input.getName().equals(appName)) {
        InstanceInfo instance = null;
        for (InstanceInfo info : input.getInstances()) {
           if (info.getId().equals(serverId)) {
              instance = info;
              break;
           }
        }
        // 触发一个 EurekaInstanceRenewedEvent 事件,我们可以通过 ApplicationListener 监听该事件
        publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
        break;
     }
  }
  return super.renew(appName, serverId, isReplication);
}

PeerAwareInstanceRegistryImpl

InstanceRegistry 类在服务注册、续约、下线等操作完成后,会调用 PeerAwareInstanceRegistryImpl 的相关逻辑。而 PeerAwareInstanceRegistryImpl 中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它 Eureka Server 的能力。我们这里分析一下 renew() 方法,源码如下:

public boolean renew(final String appName, final String id, final boolean isReplication) {
   // 调用父类 renew() 方法
   if (super.renew(appName, id, isReplication)) {
       // 广播到其他节点
       replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
       return true;
   }
   return false;
}

/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
                             InstanceInfo info /* optional */,
                             InstanceStatus newStatus /* optional */, boolean isReplication) {
   Stopwatch tracer = action.getTimer().start();
   try {
       if (isReplication) {
           numberOfReplicationsLastMin.increment();
       }
       // If it is a replication already, do not replicate again as this will create a poison replication
       if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
           return;
       }

       for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
           // If the url represents this host, do not replicate to yourself.
           if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
               continue;
           }
           // 复制到其他节点
           replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
       }
   } finally {
       tracer.stop();
   }
}

AbstractInstanceRegistry

该类用于处理来自 Eureka 客户端的所有注册表请求。renew() 方法源码如下:

/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
*/
public boolean renew(String appName, String id, boolean isReplication) {
   RENEW.increment(isReplication);
    // 所有的服务信息都添加到 registry 这个 map 中,
    // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
   Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
   Lease<InstanceInfo> leaseToRenew = null;
   if (gMap != null) {
       // 主要是为了获取当前服务的一些过期信息
       leaseToRenew = gMap.get(id);
   }
   if (leaseToRenew == null) {
       RENEW_NOT_FOUND.increment(isReplication);
       logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
       return false;
   } else {
       InstanceInfo instanceInfo = leaseToRenew.getHolder();
       if (instanceInfo != null) {
           // touchASGCache(instanceInfo.getASGName());
           InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                   instanceInfo, leaseToRenew, isReplication);
           if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
               logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                       + "; re-register required", instanceInfo.getId());
               RENEW_NOT_FOUND.increment(isReplication);
               return false;
           }
           if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
               logger.info(
                       "The instance status {} is different from overridden instance status {} for instance {}. "
                               + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                               overriddenInstanceStatus.name(),
                               instanceInfo.getId());
               instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

           }
       }
       renewsLastMin.increment();
       // 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
       leaseToRenew.renew();
       return true;
   }
}

// Lease.java
/**
* Renew the lease, use renewal duration if it was specified by the
* associated {@link T} during registration, otherwise default duration is
* {@link #DEFAULT_DURATION_IN_SECS}.
*/
public void renew() {
   // 这里去更新时间
   lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

服务续约的源码分析完了,下章节将接着分析服务下线的源码实现。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号