image-20230329140220294

  • Spring Cloud集成Nacos的实现过程 在Spring-cloud-commons包下的META-INF/spring.factories中包含自动装配的配置信息,其中AutoServiceRegistrationAutoConfiguration就是服务注册相关的配置类,
  • 在AutoServiceRegistrationAutoConfiguration配置类中,注入了AutoServiceRegistration实例。AbstractAutoServiceRegistration抽象类实现了该接口,最终NacosAutoServiceRegistration继承AbstractAutoServiceRegistration

客户端最终调用了reqApi方法,把信息封装成params的map向Nacos Server /nacos/v1/ns/instance 接口发送了一个POST请求,把当前实例注册进去,到这里整个客户端的核心注册流程就分析完了

nacos服务端提供接口请求地址,/v1/ns/instance,具体代码在nacos-naming下的InstanceController

主要逻辑为

  • 创建一个空服务,初始化serviceMap
  • getService,从serviceMap中根据namespaceid和serviceName得到一个服务对象
  • 调用addInstance添加到服务对象

Nacos Server 的注册表结构

说到服务注册,那么首先需要关注的是注册表结构是怎么设计的,Nacos的注册表结构设计方式是一个双重Map结构,定义如下:

img

源码中注释其实已经解释这个双重Map数据结构的存储结构,最外层的Map的Key为Namespace,Value为一个Map,内层的Map的Key为group::serviceName,Value为Service对象。Service对象中也有一个Map的数据结构,如下:

img

Map的Key值为Cluster的名字,Value为Cluster对象,Cluster对象中有两个Set的数据结构,用来存储Instance,这个Instance才是真正的客户端注册过来的实例信息。

img

客户端发送http post请求,请求地址为/v1/ns/instance,服务端的InstanceController接收和响应客户端的请求。

InstanceController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {

//从request中获取客户端传递的namespaceId
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//从request中获取客户端传递的serviceName
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//检查服务名是否合法
NamingUtils.checkServiceNameFormat(serviceName);

//从request中获取客户端传递的参数构造instance对象
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//实际调的是 serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
InstanceOperatorServiceImpl.getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}

ServiceManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

//1.创建一个空的服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());

//2.从缓存中获取服务
Service service = getService(namespaceId, serviceName);

//3.检查服务是否为null
checkServiceIsNull(service, namespaceId, serviceName);

//4.添加实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

createEmptyService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
//如果不存在就创建一个服务
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从内部的serviceMap中获取service
Service service = getService(namespaceId, serviceName);
//假如我们是第一次注册服务,那么这里从缓存中得到的就是null
if (service == null) {

Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
//实例化service
service = new Service();
//设置服务名
service.setName(serviceName);
//设置服务归属的命名空间
service.setNamespaceId(namespaceId);
//设置服务归属的组
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
//设置服务的最后修改时间为当前系统时间
service.setLastModifiedMillis(System.currentTimeMillis());
//重新计算服务的MD5值
service.recalculateChecksum();
//由于上一步传入进来的cluster等于null,所以此处不执行
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//验证服务是否合法,比如服务名
service.validate();

//添加服务并且初始化服务
putServiceAndInit(service);
//local等于true,临时实例,以下代码不执行
if (!local) {
addOrReplaceService(service);
}
}
}

addInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//根据namespaceId、serviceName以及是否是临时实例,创建一个key,由于此时我们注册的是临时实例,所以key为com.alibaba.nacos.naming.iplist.ephemeral.{namespaceId}.##.{serviceName}
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

//从缓存中获取上一步中创建的空service
Service service = getService(namespaceId, serviceName);

synchronized (service) {
//更新实例并返回该服务下的所有实例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

//创建一个instances对象,用于传递实例列表
Instances instances = new Instances();
instances.setInstanceList(instanceList);

//触发一个服务变更的通知以及向集群中的其他nacos节点同步实例数据
consistencyService.put(key, instances);
}
}


for (Instance instance : ips) {
//首次注册,clusterMap中不包含当前实例的clusterName的key
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
//实例化cluster,并通过构造函数传递集群的名字,以及当前集群归属于哪一个service
Cluster cluster = new Cluster(instance.getClusterName(), service);
//集群初始化,内部还是开启了一个心跳检查的定时任务,先不管
cluster.init();
//向service内部的clusterMap中设置cluster
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn(“cluster: {} not found, ip: {}, will create new cluster with default configuration.”,
instance.getClusterName(), instance.toJson());
}

init

注册了一个notifier的定时任务,Notifier实现了Runnable接口,

它的run方法:如果ArrayBlockingQueue中没有数据,take方法会阻塞,直到从队列中取到数据,handle(pair);

它的handle方法:遍历listeners,if (action == DataOperation.CHANGE) {
//调用listener的onChange方法
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}

onChange:updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
//创建一个空的map,用户存储实例数据
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
//遍历clusterMap,copyOnWrite,考虑并发读写问题
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}

//遍历待添加的实例列表
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}

//如果实例归属的集群名等于空
if (StringUtils.isEmpty(instance.getClusterName())) {
//则使用默认的集群名DEFAULT
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}

//如果服务的clusterMap中不包含clusterName key,由于在前几步中已经添加过了
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
//实例化cluster
Cluster cluster = new Cluster(instance.getClusterName(), this);
//初始化
cluster.init();
//添加到clusterMap中
getClusterMap().put(instance.getClusterName(), cluster);
}

//首次注册,等于null
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}

//将实例添加到对应的cluster中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}

//遍历ipMap,并将新注册的实例添加到clusterMap中
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//请自行跟踪updateIps方法,下文的问题2的解答在此处
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}

//更新service的最后修改时间为当前系统时间
setLastModifiedMillis(System.currentTimeMillis());
//底层通过UDP socket 向所有订阅了该服务的客户端推送服务,不在主线中,先不管
getPushService().serviceChanged(this);

//v2版本新增
ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
StringBuilder stringBuilder = new StringBuilder();

//遍历所有实例,包含临时和持久化实例,输出日志
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
}

Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());

}