// new DiscoveryClient 的时候初始化心跳定时任务周期性的调度// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,// 续约周期是 30s.也就是每 30s 执行一次renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}
boolean renew() {EurekaHttpResponse httpResponse;try {// 1、通过jerseyclient 去发送心中请求httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());// 2、如果 404 的话去发起 register 请求if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();// 这里是一个计数器,如果失败 + 1logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}// 3、成功返回 200,续约成功return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);return false;}}
可以通过断点调试知道,最终服务端调用的是 InstanceResource 的 renewLease 方法
@PUTpublic Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 如果注册表中没发现直接返回 404,那 client 发现 404 后就会执行 registry 逻辑去 addInstance(新增实例) 添加到 registry(注册表) 中去// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// Check if we need to sync based on dirty time stamp, the client instance might have changed some valueResponse response = null;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());return response;}
// super.renew() 主要是调用 AbstractInstanceRegistry 的 renew 方法public boolean renew(final String appName, final String id, final boolean isReplication) {if (super.renew(appName, id, isReplication)) {//这个就是复制到其他 eureka 服务节点replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}
public boolean renew(String appName, String id, boolean isReplication) {// 1、统计信息新增 1RENEW.increment(isReplication);// 2、从 registry(注册表)中获取信息 appName 为 key 的 Map 信息Map<String, Lease> gMap = registry.get(appName);Lease leaseToRenew = null;// 3、苑 Lease 信息if (gMap != null) {leaseToRenew = gMap.get(id);}// 4、如果为空则直接返回 404,客户端会发起 registry 请求(前面的文章看到过这样的逻辑处理)if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// 4.1、InstanceStatus 判断,如果为 UNKNOWN,可能被删除覆盖了,所以可能客户端需要重新注册InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}// 4.2、如果两者不一样,则更新 statusif (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {Object[] args = {instanceInfo.getStatus().name(),instanceInfo.getOverriddenStatus().name(),instanceInfo.getId()};logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", args);instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}// 5、统计信息新增(上一分钟的续约次数)renewsLastMin.increment();// 6、更新 lastUpdateTimestampleaseToRenew.renew();return true;}}
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);switch (action) {case Cancel:node.cancel(appName, id);break;case Heartbeat:// 这里我们可以知道会调用到这里来// 从 overriddenInstanceStatusMap 中获取 InstanceStatus,默认是 1 个小时后过期。InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);// 这个方法说一下infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:node.register(info);break;case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);}}@Overridepublic InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions) {// 1、从注册表中获取 appName 为 key 的 MapMap> leaseMap = registry.get(appName);Lease lease = null;if (leaseMap != null) {lease = leaseMap.get(id);}// 2、判断 Lease 如果不为空,且(Lease 开启没有开启或者 lease 没有过期)则返回装饰的 InstanceInfoif (lease != null&& (!isLeaseExpirationEnabled() || !lease.isExpired())) {return decorateInstanceInfo(lease);} else if (includeRemoteRegions) {for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {Application application = remoteRegistry.getApplication(appName);if (application != null) {return application.getByInstanceId(id);}}}return null;}@Overridepublic boolean isLeaseExpirationEnabled() {// 1、这个是判断自我保护模式是否开启(这个模式是默认开启的,后面EurekaServer 自我保护源码分析的时候会分析一下,怎么来判断等等)if (!isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.//如果自我保护没有开启,说明此时是可以过期的,所以返回 true。// 因为如果自我保护开启,就不会进来,说明此时要判断一下心跳来判断是否 lease过期开启return true;}//2、阈值 > 0,且上一分钟的心跳数 > 阈值才说明 lease 过期开启否则返回 falsereturn numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;}
public void heartbeat(final String appName, final String id,final InstanceInfo info, final InstanceStatus overriddenStatus,boolean primeConnection) throws Throwable {// 1、传递的是 false 不会进来if (primeConnection) {// We do not care about the result for priming request.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);return;}// 2、构造一个 ReplicationTaskReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {@Overridepublic EurekaHttpResponseexecute() throws Throwable {return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);}@Overridepublic void handleFailure(int statusCode, Object responseEntity) throws Throwable {super.handleFailure(statusCode, responseEntity);if (statusCode == 404) {logger.warn("{}: missing entry.", getTaskName());if (info != null) {logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",getTaskName(), info.getId(), info.getStatus());register(info);}} else if (config.shouldSyncWhenTimestampDiffers()) {InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;if (peerInstanceInfo != null) {syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);}}}};// 3、获取 expiryTimelong expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);// 4、分发到 batchingDispatcherbatchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack