前面章节介绍了 Eureka 客户端和服务端的启动过程,同时也分析了 EnableDiscoveryClient 类的源码,本章节将介绍 Eureka 服务端是怎样实现服务注册的。
我们通过在 AbstractInstanceRegistry 类中的方法打断点,查看调用该方法的栈,如下图:

我们将依次分析上图中的 ApplicationResource、InstanceRegistry、PeerAwareInstanceRegistryImpl 和 AbstractInstanceRegistry 类关于服务注册部分逻辑。详细分析如下:
InstanceRegistry 类继承层次图如下:

该类将提供一个 POST 类型的 HTTP 接口供 Eureka 客户端调用,通过该接口可以将 Eureka 客户端的基础信息注册到 Eureka 服务端。该方法实现源码如下:
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
* {@link InstanceInfo} information of the instance. 实例信息
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes. 一个 HTTP 头参数,指定是否从其他节点复制服务信息
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 验证 instanceinfo 是否包含所有必需的必需字段
//...
// handle cases where clients may be registering with bad DataCenterInfo with missing data
// 处理客户端可能向缺少数据的错误 DataCenterInfo 注册的情况
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
//...
}
// 将注册业务交给 register() 方法处理
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}该类实现了 InstanceRegistry 接口。InstanceRegistry 接口继承了 LookupService 、LeaseManager 接口,提供应用实例的注册与发现服务。另外,它结合实际业务场景,定义了更加丰富的接口方法。这里仅仅分析 register() 方法,源码如下:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 触发一个 EurekaInstanceRegisteredEvent 事件,我们可以通过 ApplicationListener 监听该事件
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
// 调用父类的 register() 方法
// 父类为 PeerAwareInstanceRegistryImpl,继续分析 PeerAwareInstanceRegistryImpl 的 register()
super.register(info, isReplication);
}
private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration
+ ", isReplication " + isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}你可以通过 ApplicationListener 去监听 EurekaInstanceRegisteredEvent 事件,例如:
@Component
public class MyApplicationListener implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if(event instanceof EurekaInstanceRegisteredEvent) {
System.out.println("服务注册Event");
}
}
}InstanceRegistry 类在服务注册、续约、下线等操作完成后,会调用 PeerAwareInstanceRegistryImpl 的相关逻辑。而 PeerAwareInstanceRegistryImpl 中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它 Eureka Server 的能力。我们这里分析一下 register() 方法,源码如下:
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 调用父类 AbstractInstanceRegistry 注册服务
super.register(info, leaseDuration, isReplication);
// 广播消息,同步注册消息到其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}将所有 Eureka 操作复制到其他 Eureka 节点(不包含当前 Eureka 节点),replicateToPeers() 方法源码如下:
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 排除当前主机
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 复制信息到对于的节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}该类用于处理来自 Eureka 客户端的所有注册表请求。register() 方法源码如下:
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
// private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// private final Lock read = readWriteLock.readLock();
// 只读锁
read.lock();
try {
// 所有的服务信息都添加到 registry 这个 map 中,
// 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 如果没有该服务的信息,则新建,并添加到registry中
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// existingLease 信息即服务的一些注册时间等信息,主要是为了校验该服务是否过期,如果已过期,则剔除
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}服务注册信息最终存放到,外层 map 的 key 即为应用的服务名,内层 map 的 key 为我们设置的eureka.instance.instance-id,设置成这种格式,当多个应用提供相同服务时,那么外层 map 的 key 都相同,内层 map 的 key 不同。