前面章节分析了 Eureka 服务注册、服务续约的源码,本章节将分析 Eureka 的服务下线源码实现。
当服务实例正常关闭时,它会发送一个服务下线的消息给注册中心,注册中心收到信息后,会将该服务实例状态置为下线,并把该信息传播出去。该下线请求不会自动完成,它需要调用以下内容:
DiscoveryManager.getInstance().shutdownComponent()
在开始分析前,我们进入 AbstractInstanceRegistry.internalCancel() 方法,打一个断点查看方法调用栈信息。如下图:

从上图得知,InstanceResource 类的 cancelLease() 方法接收 Eureka 客户端的服务取消请求,然后服务取消请求将一次调用如下方法:
InstanceResource.cancelLease()
InstanceRegistry.cancel()
PeerAwareInstanceRegistryImpl.cancel()
AbstractInstanceRegistry.cancel()
InstanceRegistry.internalCancel()
AbstractInstanceRegistry.internalCancel()
下面将逐个分析上面方法的源码。其中,InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类的继承层次如下图:

接收 Eureka 客户端发起的服务取消请求。源码如下:
/**
* Handles cancellation of leases for this particular instance.
*
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
* @return response indicating whether the operation was a success or
* failure.
*/
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
// 调用 cancle() 方法处理服务取消请求
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}InstanceRegistry 类的 cancel() 方法通过调用 handleCancelation() 方法触发 EurekaInstanceCanceledEvent 事件。源码如下:
@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
// 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
// 处理一些其他业务逻辑
handleCancelation(appName, serverId, isReplication);
return super.cancel(appName, serverId, isReplication);
}
private void handleCancelation(String appName, String id, boolean isReplication) {
log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}PeerAwareInstanceRegistryImpl 类提供了将当前节点注册信息同步到其他节点的功能。这里将介绍 cancel() 方法,该方法在我们成功调用父类的 cancel() 方法后,才进而将服务取消操作复制到其他 Eureka 节点,注意:包含当前节点。源码如下:
/**
* (non-Javadoc)
*
* @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,
* java.lang.String, long, boolean)
*/
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
// 调用父类的 cancle() 方法,继续处理服务取消请求
// 如果服务取消成功,则将操作同步到其他 Eureka 节点
if (super.cancel(appName, id, isReplication)) {
// 将服务取消操作复制到其他 Eureka 节点,不包含当前节点
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
return true;
}
return false;
}
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 操作复制到其他 Eureka 节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}取消服务注册,源码如下:
/**
* Cancels the registration of an instance.
*
* <p>
* This is normally invoked by a client when it shuts down informing the
* server to remove the instance from traffic.
* </p>
*
* @param appName the application name of the application.
* @param id the unique identifier of the instance.
* @param isReplication true if this is a replication event from other nodes, false
* otherwise.
* @return true if the instance was removed from the {@link AbstractInstanceRegistry} successfully, false otherwise.
*/
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
// 取消实例的注册
return internalCancel(appName, id, isReplication);
}@Override
protected boolean internalCancel(String appName, String id, boolean isReplication) {
// 触发 EurekaInstanceCanceledEvent 事件,你可以通过 ApplicationListener 去监听事件
handleCancelation(appName, id, isReplication);
// 调用父类的 internalCancel() 方法
return super.internalCancel(appName, id, isReplication);
}
private void handleCancelation(String appName, String id, boolean isReplication) {
log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
}/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
// private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// private final Lock read = readWriteLock.readLock();
// 从可重入锁中获取了只读锁
read.lock();
try {
CANCEL.increment(isReplication);
// 所有的服务信息都添加到registry这个map中,
// 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 将当前服务的剔除时间置为当前时间 evictionTimestamp = System.currentTimeMillis();
leaseToCancel.cancel();
// 获取服务信息
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 将服务信息置为已删除
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 清理缓存
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
read.unlock();
}
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}到这里,Eureka 源码就分析完了。仅供学习交流,如有问题请及时反馈,非常感谢。