Nacos服务端在实例进行注册时,会在服务初始化的时候开启一个客户端心跳检测任务ClientBeatCheckTask。
ClientBeatCheckTask的添加
每一个Service初始化时都会添加一个与之对应的一个ClientBeatCheckTask。
com.alibaba.nacos.naming.core.Service#init
public void init() {
// 开启客户端心跳检测任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
这个任务默认5秒执行一次。
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
ClientBeatCheckTask的执行
ClientBeatCheckTask实现了Runnable接口,主要看run()方法。
遍历服务中的所有实例,如果心跳时间超过15s就将健康状态标记为false,如果心跳时间超过30s就会删除实例。
将健康状态标记为false时会发布InstanceHeartbeatTimeoutEvent和ServiceChangeEvent两个事件。
InstanceHeartbeatTimeoutEvent没有地方监听此事件。
ServiceChangeEvent由本身发布事件的PushService监听。
com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
// 心跳时间超过15s就将健康状态标记为false
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
if (!getGlobalConfig().isExpireInstance()) {
return;
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
// 心跳时间超过30s就会删除实例
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
ClientBeatCheckTask#deleteIp
调用自己的/nacos/v1/ns/instance注销实例。
com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#deleteIp
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
// 调用自己的/nacos/v1/ns/instance注销实例
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
@Override
public void onCancel() {
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
InstanceController#deregister
com.alibaba.nacos.naming.controllers.InstanceController#deregister
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 查询出Service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
return "ok";
// 删除实例
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
ServiceManager#removeInstance
这里只是将注册表的实例列表取出然后删除,并放入缓存中,并没有删除注册表中的实例。
com.alibaba.nacos.naming.core.ServiceManager#removeInstance(java.lang.String, java.lang.String, boolean, com.alibaba.nacos.naming.core.Instance…)
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
Service service = getService(namespaceId, serviceName);
synchronized (service) {
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 将缓存中的实例列表删除实例
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 更新缓存
consistencyService.put(key, instances);
private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips)
throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
真正从注册表中删除实例与注册实例一样通过同一个异步任务来完成。
ServiceChangeEvent事件的监听
PushService监听了ServiceChangeEvent事件。
当服务变更时,Nacos服务端会通过UDP通知所有监听此服务的客户端。这里为了使客户端能够实时的知道服务的实例状态变更了,又为了不增加服务器的压力,所以使用了UDP,因为UDP不需要建立连接,直接发送一个报文即可,不管客户端有没有收到。即使客户端没有收到,客户端也有一个定时任务每隔5s来查询服务的实例列表。
com.alibaba.nacos.naming.push.PushService#onApplicationEvent
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
* client什么时候加入?客户端查询服务的实例列表时
* @see InstanceController#doSrvIpxt(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, int, java.lang.String, boolean, java.lang.String, java.lang.String, boolean)
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// 遍历所有查询过此服务的客户端列表
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
// 将变更服务的实例列表通过UDP发送给客户端
udpPush(ackEntry);
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
查询服务的实例列表
Nacos客户端会通过调用接口/nacos/v1/ns/instance/list来查询服务端对应服务的实例列表。
com.alibaba.nacos.naming.controllers.InstanceController#list
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 查询serviceName对应的实例
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 根据命名空间和服务名查询出服务
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
// 将监听实例变更的客户端加入到一个clientMap中,后续如果服务的实例列表状态变更了会遍历此MAP,发送UDP报文通知
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
checkIfDisabled(service);
List<Instance> srvedIPs;
// 从服务service中查询出实例列表
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
if (isCheck) {
result.put("reachProtectThreshold", false);
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
查询服务的实例列表直接查询的是注册表,不同namespace,不同group之间的service是无法调用的,同一个service下的不同Cluster可以调用。