Eureka 源码分析(七)

在服务注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka 服务端该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka 服务端在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。

前面章节分析了 Eureka 服务注册的源码,本章将接着分析 Eureka 服务续约的源码实现。

在服务注册完成后,Eureka 客户会每隔 30 秒发送一次心跳来续约。通过续约来告知 Eureka 服务端该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果 Eureka 服务端在 90 秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。

注意:建议不要轻易更改续约间隔。

先找到 AbstractInstanceRegistry 类,在 renew() 方法中打一个断点,然后分别开启 Eureka 服务端和客户端,等待片刻自动进入调试模式,调用栈信息如下图:

Eureka 源码分析(七)

从上图得知,客户端请求由 InstanceResource 的 renewLease() 方法来接收,然后将请求交给 InstanceRegistry 的 renew() 方法,InstanceRegistry 又将调用 PeerAwareInstanceRegistryImpl 类的 renew() 方法,PeerAwareInstanceRegistryImpl 将继续调用 AbstractInstanceRegistry 类的 renew() 方法。

InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的关系如下图:

Eureka 源码分析(七)

下面将从源码的角度逐一分析各个类中的 renew() 方法实现。

InstanceResource

该类将提供一个 PUT 类型的 HTTP 接口供 Eureka 客户端调用,通过该接口实现服务续约。该方法实现源码如下:

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 服务续约(registry 为 InstanceRegistry 类型)
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // Not found in the registry, immediately ask for a register
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

InstanceRegistry

该类实现了 InstanceRegistry 接口。InstanceRegistry 接口继承了 LookupService 、LeaseManager 接口,提供应用实例的注册与发现服务。另外,它结合实际业务场景,定义了更加丰富的接口方法。renew() 方法源码如下:

@Override
public boolean renew(final String appName, final String serverId, boolean isReplication) {
   log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
   // 从注册表中按 Application.getName() 的排序,获取所有 Applications 的列表。
   List<Application> applications = getSortedApplications();
   for (Application input : applications) {
      if (input.getName().equals(appName)) {
         InstanceInfo instance = null;
         for (InstanceInfo info : input.getInstances()) {
            if (info.getId().equals(serverId)) {
               instance = info;
               break;
            }
         }
         // 触发一个 EurekaInstanceRenewedEvent 事件,我们可以通过 ApplicationListener 监听该事件
         publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
         break;
      }
   }
   return super.renew(appName, serverId, isReplication);
}

PeerAwareInstanceRegistryImpl

InstanceRegistry 类在服务注册、续约、下线等操作完成后,会调用 PeerAwareInstanceRegistryImpl 的相关逻辑。而 PeerAwareInstanceRegistryImpl 中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它 Eureka Server 的能力。我们这里分析一下 renew() 方法,源码如下:

public boolean renew(final String appName, final String id, final boolean isReplication) {
    // 调用父类 renew() 方法
    if (super.renew(appName, id, isReplication)) {
        // 广播到其他节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 *
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 复制到其他节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

AbstractInstanceRegistry

该类用于处理来自 Eureka 客户端的所有注册表请求。renew() 方法源码如下:

/**
 * Marks the given instance of the given app name as renewed, and also marks whether it originated from
 * replication.
 *
 * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
 */
public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
     // 所有的服务信息都添加到 registry 这个 map 中,
     // 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        // 主要是为了获取当前服务的一些过期信息
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
        // 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
        leaseToRenew.renew();
        return true;
    }
}

// Lease.java
/**
 * Renew the lease, use renewal duration if it was specified by the
 * associated {@link T} during registration, otherwise default duration is
 * {@link #DEFAULT_DURATION_IN_SECS}.
 */
public void renew() {
    // 这里去更新时间
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

服务续约的源码分析完了,下章节将接着分析服务下线的源码实现。

人生就像赛跑,不在乎你是否第一个到达终点,而在乎你有没有跑完全程。
0 不喜欢
说说我的看法 -
全部评论(
没有评论
目录
热门标签
热门文章
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号