前面章节分析了 Eureka 客户端源码,本章将分析 Eureka 服务端源码。同 Eureka Client 启动一样,需要添加@EnableEurekaServer 注解。
在该注解源码中用 @Import(EurekaServerMarkerConfiguration.class) 表明了程序在启动时会先加载 EurekaServerMarkerConfiguration 配置类中的配置,而在该配置类中,发布了一个标记类 EurekaServerMarkerConfiguration$Marker,该标记类会用于开启后续 EurekaServer 相关配置的加载工作。
@EnableEurekaServer 源码如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}EurekaServerMarkerConfiguration 类的源码如下:
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}EurekaServerMarkerConfiguration 类是一个 @Configuration 配置类,且类中只有一个 Marker 类(该类是一个空类,没有任何实现),并且在该类的 eurekaServerMarkerBean() Bean 方法中创建了 Marker 类的实例。
从注释中可以看到 EurekaServerMarkerConfiguration 是一个激活 EurekaServerAutoConfiguration 的开关。通过之前章节的分析,我们实际可以发现 Spring Boot 相关项目的一些设计模式了,很多的类并不是被显示的加载到容器中,而是通过配置的方式,最经典的方式就是放到 META-INF/spring.factories 文件中去加载,那么我们也来看下 spring-cloud-netflix-eureka-server-{version}.jar!META-INF/spring.factories,具体内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
真正的配置信息在 EurekaServerAutoConfiguration 中,我们继续查看 EurekaServerAutoConfiguration 自动配置类,源码如下:
// 表示这是一个配置类
@Configuration(proxyBeanMethods = false)
// 导入启动 EurekaServer 的 bean
@Import(EurekaServerInitializerConfiguration.class)
// 表示只有在 spring 容器里面含有 Market 这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration )
// 这就是控制是否开启 EurekaServer 的关键
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
// 加载配置文件
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
//...
}该类要实现自动配置,必须满足 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 条件,该条件要求必须在 BeanFactory 中存在 EurekaServerMarkerConfiguration.Marker 实例,才能正常被执行。
让我们继续看看 EurekaServerAutoConfiguration 类的源码到底干了什么,源码如下:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
// ...
// Eureka-server 的可视化 web 界面就是通过 EurekaController 提供的
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
// ...
// 接收客户端的注册等请求就是通过 InstanceRegistry 来处理的,是真正处理业务的类
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// ...
// 初始化 Eureka-server,会同步其他注册中心的数据到当前注册中心
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
registry, serverContext);
}
// 创建过滤器,并匹配路径 /eureka/*
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
//...
}通过上面的源码分析可知,EurekaServer 在启动的时候,会加载很多 bean 到 Spring 容器中,每个 bean 都实现了各自的功能,其中真正处理客户端请求的类是 InstanceRegistry。源码如下:
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
//...
// 接收 Eureka 客户端的服务注册请求
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
handleRegistration(info, leaseDuration, isReplication);
super.register(info, leaseDuration, isReplication);
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
// 接收 Eureka 客户端服务下线请求
@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
handleCancelation(appName, serverId, isReplication);
return super.cancel(appName, serverId, isReplication);
}
// 接收 Eureka 客户端服务续约请求
@Override
public boolean renew(final String appName, final String serverId, boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);
}
//...
}以上是在处理客户端的不同请求。但是客户端发送的是 HTTP 请求,这只是一个类,服务端应该也有一个接收 HTTP 请求的类,然后将接收到的请求封装后委托给 InstanceRegistry 来处理具体业务。这个类就是 com.netflix.eureka.resources 包下的ApplicationResource、InstanceResource。
ApplicationResource 类源码如下:
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
//...
private final EurekaServerConfig serverConfig;
private final PeerAwareInstanceRegistry registry;
private final ResponseCache responseCache;
//...
// 获取有关特定 Application 的信息
@GET
public Response getApplication(@PathParam("version") String version,
@HeaderParam("Accept") final String acceptHeader,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
//...
}
// 获取有关应用程序特定实例的信息
@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
return new InstanceResource(this, id, serverConfig, registry);
}
// 为 Application 注册有关特定实例的信息
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
//...
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
//...
}InstanceResource 类源码如下:
@Produces({"application/xml", "application/json"})
public class InstanceResource {
//...
private final PeerAwareInstanceRegistry registry;
private final EurekaServerConfig serverConfig;
//...
// Get请求返回关于实例的InstanceInfo的信息
@GET
public Response getInstanceInfo() {
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id);
if (appInfo != null) {
logger.debug("Found: {} - {}", app.getName(), id);
return Response.ok(appInfo).build();
} else {
logger.debug("Not Found: {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
}
// 从客户端实例更新租约的put请求
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
//...
return response;
}
// 处理 InstanceStatus 更新
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
//...
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
//...
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
return Response.serverError().build();
}
}
// 删除实例的状态
@DELETE
@Path("status")
public Response deleteStatusUpdate(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("value") String newStatusValue,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
//....
InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
newStatus, lastDirtyTimestamp, "true".equals(isReplication));
//...
} catch (Throwable e) {
logger.error("Error removing instance's {} status override", id);
return Response.serverError().build();
}
}
// 更新特定于用户的元数据信息
@PUT
@Path("metadata")
public Response updateMetadata(@Context UriInfo uriInfo) {
try {
InstanceInfo instanceInfo = registry.getInstanceByAppAndId(app.getName(), id);
//...
Map<String, String> metadataMap = instanceInfo.getMetadata();
// Metadata map is empty - create a new map
if (Collections.emptyMap().getClass().equals(metadataMap.getClass())) {
metadataMap = new ConcurrentHashMap<>();
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setMetadata(metadataMap);
instanceInfo = builder.build();
}
// Add all the user supplied entries to the map
for (Entry<String, List<String>> entry : entrySet) {
metadataMap.put(entry.getKey(), entry.getValue().get(0));
}
//...
registry.register(instanceInfo, false);
return Response.ok().build();
} catch (Throwable e) {
logger.error("Error updating metadata for instance {}", id, e);
return Response.serverError().build();
}
}
// 处理此特定实例的租约取消
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
//...
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}
}
//...
}到这里,Eureka 客户端和服务端是怎样启动?以及启动过程就分析完了,后续将继续分析服务注册、取消服务等。