Eureka 服务端源码分析:启动过程

前面章节分析了 Eureka 客户端源码,本章将分析 Eureka 服务端源码。同 Eureka Client 启动一样,需要添加@EnableEurekaServer 注解。

在该注解源码中用 @Import(EurekaServerMarkerConfiguration.class) 表明了程序在启动时会先加载 EurekaServerMarkerConfiguration 配置类中的配置,而在该配置类中,发布了一个标记类 EurekaServerMarkerConfiguration$Marker,该标记类会用于开启后续 EurekaServer 相关配置的加载工作。

@EnableEurekaServer 源码如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

EurekaServerMarkerConfiguration 类的源码如下:

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

  @Bean
  public Marker eurekaServerMarkerBean() {
     return new Marker();
  }

  class Marker {

  }

}

EurekaServerMarkerConfiguration 类是一个 @Configuration 配置类,且类中只有一个 Marker 类(该类是一个空类,没有任何实现),并且在该类的 eurekaServerMarkerBean() Bean 方法中创建了 Marker 类的实例。

从注释中可以看到 EurekaServerMarkerConfiguration 是一个激活 EurekaServerAutoConfiguration 的开关。通过之前章节的分析,我们实际可以发现 Spring Boot 相关项目的一些设计模式了,很多的类并不是被显示的加载到容器中,而是通过配置的方式,最经典的方式就是放到 META-INF/spring.factories 文件中去加载,那么我们也来看下 spring-cloud-netflix-eureka-server-{version}.jar!META-INF/spring.factories,具体内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
 org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

真正的配置信息在 EurekaServerAutoConfiguration 中,我们继续查看 EurekaServerAutoConfiguration 自动配置类,源码如下:

// 表示这是一个配置类
@Configuration(proxyBeanMethods = false)
// 导入启动 EurekaServer 的 bean
@Import(EurekaServerInitializerConfiguration.class)
// 表示只有在 spring 容器里面含有 Market 这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration )
// 这就是控制是否开启 EurekaServer 的关键
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
// 加载配置文件
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   //...
}

该类要实现自动配置,必须满足 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 条件,该条件要求必须在 BeanFactory 中存在 EurekaServerMarkerConfiguration.Marker 实例,才能正常被执行。

让我们继续看看 EurekaServerAutoConfiguration 类的源码到底干了什么,源码如下:

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
   // ...
   // Eureka-server 的可视化 web 界面就是通过 EurekaController 提供的
  @Bean
  @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
  public EurekaController eurekaController() {
     return new EurekaController(this.applicationInfoManager);
  }

   // ...
   // 接收客户端的注册等请求就是通过 InstanceRegistry 来处理的,是真正处理业务的类
  @Bean
  public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
     this.eurekaClient.getApplications(); // force initialization
     return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
           this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
           this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
  }
  // ...
  // 初始化 Eureka-server,会同步其他注册中心的数据到当前注册中心
  @Bean
  public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
        EurekaServerContext serverContext) {
     return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
           registry, serverContext);
  }

  // 创建过滤器,并匹配路径 /eureka/*
  @Bean
  public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {
     FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
     bean.setFilter(new ServletContainer(eurekaJerseyApp));
     bean.setOrder(Ordered.LOWEST_PRECEDENCE);
     bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
     return bean;
  }
  //...
}

通过上面的源码分析可知,EurekaServer 在启动的时候,会加载很多 bean 到 Spring 容器中,每个 bean 都实现了各自的功能,其中真正处理客户端请求的类是 InstanceRegistry。源码如下:

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
   //...
   // 接收 Eureka 客户端的服务注册请求
  @Override
  public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
     handleRegistration(info, leaseDuration, isReplication);
     super.register(info, leaseDuration, isReplication);
  }

  @Override
  public void register(final InstanceInfo info, final boolean isReplication) {
     handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
     super.register(info, isReplication);
  }

  // 接收 Eureka 客户端服务下线请求
  @Override
  public boolean cancel(String appName, String serverId, boolean isReplication) {
     handleCancelation(appName, serverId, isReplication);
     return super.cancel(appName, serverId, isReplication);
  }

  // 接收 Eureka 客户端服务续约请求
  @Override
  public boolean renew(final String appName, final String serverId, boolean isReplication) {
     log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
     List<Application> applications = getSortedApplications();
     for (Application input : applications) {
        if (input.getName().equals(appName)) {
           InstanceInfo instance = null;
           for (InstanceInfo info : input.getInstances()) {
              if (info.getId().equals(serverId)) {
                 instance = info;
                 break;
              }
           }
           publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
           break;
        }
     }
     return super.renew(appName, serverId, isReplication);
  }
  //...
}

以上是在处理客户端的不同请求。但是客户端发送的是 HTTP 请求,这只是一个类,服务端应该也有一个接收 HTTP 请求的类,然后将接收到的请求封装后委托给 InstanceRegistry 来处理具体业务。这个类就是 com.netflix.eureka.resources 包下的ApplicationResource、InstanceResource。

ApplicationResource 类源码如下:

@Produces({"application/xml", "application/json"})
public class ApplicationResource {
   //...
   private final EurekaServerConfig serverConfig;
   private final PeerAwareInstanceRegistry registry;
   private final ResponseCache responseCache;
   //...

   // 获取有关特定 Application 的信息
   @GET
   public Response getApplication(@PathParam("version") String version,
                                  @HeaderParam("Accept") final String acceptHeader,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
       //...
   }

   // 获取有关应用程序特定实例的信息
   @Path("{id}")
   public InstanceResource getInstanceInfo(@PathParam("id") String id) {
       return new InstanceResource(this, id, serverConfig, registry);
   }

   // 为 Application 注册有关特定实例的信息
   @POST
   @Consumes({"application/json", "application/xml"})
   public Response addInstance(InstanceInfo info,
                               @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
       //...
       registry.register(info, "true".equals(isReplication));
       return Response.status(204).build();  // 204 to be backwards compatible
   }

   //...
}

InstanceResource 类源码如下:

@Produces({"application/xml", "application/json"})
public class InstanceResource {
   //...
   private final PeerAwareInstanceRegistry registry;
   private final EurekaServerConfig serverConfig;
   //...
   // Get请求返回关于实例的InstanceInfo的信息
   @GET
   public Response getInstanceInfo() {
       InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id);
       if (appInfo != null) {
           logger.debug("Found: {} - {}", app.getName(), id);
           return Response.ok(appInfo).build();
       } else {
           logger.debug("Not Found: {} - {}", app.getName(), id);
           return Response.status(Status.NOT_FOUND).build();
       }
   }

   // 从客户端实例更新租约的put请求
   @PUT
   public Response renewLease(
           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
           @QueryParam("overriddenstatus") String overriddenStatus,
           @QueryParam("status") String status,
           @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
       boolean isFromReplicaNode = "true".equals(isReplication);
       boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
       //...
       return response;
   }

   // 处理 InstanceStatus 更新
   @PUT
   @Path("status")
   public Response statusUpdate(
           @QueryParam("value") String newStatus,
           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
           @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
       try {
           //...
           boolean isSuccess = registry.statusUpdate(app.getName(), id,
                   InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                   "true".equals(isReplication));
           //...
       } catch (Throwable e) {
           logger.error("Error updating instance {} for status {}", id,
                   newStatus);
           return Response.serverError().build();
       }
   }

   // 删除实例的状态
   @DELETE
   @Path("status")
   public Response deleteStatusUpdate(
           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
           @QueryParam("value") String newStatusValue,
           @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
       try {
           //....
           InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
           boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
                   newStatus, lastDirtyTimestamp, "true".equals(isReplication));
           //...
       } catch (Throwable e) {
           logger.error("Error removing instance's {} status override", id);
           return Response.serverError().build();
       }
   }

   // 更新特定于用户的元数据信息
   @PUT
   @Path("metadata")
   public Response updateMetadata(@Context UriInfo uriInfo) {
       try {
           InstanceInfo instanceInfo = registry.getInstanceByAppAndId(app.getName(), id);
           //...
           Map<String, String> metadataMap = instanceInfo.getMetadata();
           // Metadata map is empty - create a new map
           if (Collections.emptyMap().getClass().equals(metadataMap.getClass())) {
               metadataMap = new ConcurrentHashMap<>();
               InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
               builder.setMetadata(metadataMap);
               instanceInfo = builder.build();
           }
           // Add all the user supplied entries to the map
           for (Entry<String, List<String>> entry : entrySet) {
               metadataMap.put(entry.getKey(), entry.getValue().get(0));
           }
           //...
           registry.register(instanceInfo, false);
           return Response.ok().build();
       } catch (Throwable e) {
           logger.error("Error updating metadata for instance {}", id, e);
           return Response.serverError().build();
       }

   }

   // 处理此特定实例的租约取消
   @DELETE
   public Response cancelLease(
           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
       try {
           boolean isSuccess = registry.cancel(app.getName(), id,
               "true".equals(isReplication));
           //...
       } catch (Throwable e) {
           logger.error("Error (cancel): {} - {}", app.getName(), id, e);
           return Response.serverError().build();
       }

   }
   //...
}

到这里,Eureka 客户端和服务端是怎样启动?以及启动过程就分析完了,后续将继续分析服务注册、取消服务等。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号