
- Spring Cloud集成Nacos的实现过程 在Spring-cloud-commons包下的META-INF/spring.factories中包含自动装配的配置信息,其中AutoServiceRegistrationAutoConfiguration就是服务注册相关的配置类,
- 在AutoServiceRegistrationAutoConfiguration配置类中,注入了AutoServiceRegistration实例。AbstractAutoServiceRegistration抽象类实现了该接口,最终NacosAutoServiceRegistration继承AbstractAutoServiceRegistration
客户端最终调用了reqApi方法,把信息封装成params的map向Nacos Server /nacos/v1/ns/instance 接口发送了一个POST请求,把当前实例注册进去,到这里整个客户端的核心注册流程就分析完了
nacos服务端提供接口请求地址,/v1/ns/instance,具体代码在nacos-naming下的InstanceController
主要逻辑为
- 创建一个空服务,初始化serviceMap
- getService,从serviceMap中根据namespaceid和serviceName得到一个服务对象
- 调用addInstance添加到服务对象
Nacos Server 的注册表结构
说到服务注册,那么首先需要关注的是注册表结构是怎么设计的,Nacos的注册表结构设计方式是一个双重Map结构,定义如下:

源码中注释其实已经解释这个双重Map数据结构的存储结构,最外层的Map的Key为Namespace,Value为一个Map,内层的Map的Key为group::serviceName,Value为Service对象。Service对象中也有一个Map的数据结构,如下:

Map的Key值为Cluster的名字,Value为Cluster对象,Cluster对象中有两个Set的数据结构,用来存储Instance,这个Instance才是真正的客户端注册过来的实例信息。

客户端发送http post请求,请求地址为/v1/ns/instance,服务端的InstanceController
接收和响应客户端的请求。
InstanceController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //从request中获取客户端传递的namespaceId final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //从request中获取客户端传递的serviceName final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); //检查服务名是否合法 NamingUtils.checkServiceNameFormat(serviceName); //从request中获取客户端传递的参数构造instance对象 final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build(); //实际调的是 serviceManager.registerInstance(namespaceId, serviceName, coreInstance); InstanceOperatorServiceImpl.getInstanceOperator().registerInstance(namespaceId, serviceName, instance); return "ok"; }
|
ServiceManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //1.创建一个空的服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //2.从缓存中获取服务 Service service = getService(namespaceId, serviceName); //3.检查服务是否为null checkServiceIsNull(service, namespaceId, serviceName); //4.添加实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
|
createEmptyService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { //如果不存在就创建一个服务 createServiceIfAbsent(namespaceId, serviceName, local, null); } public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //从内部的serviceMap中获取service Service service = getService(namespaceId, serviceName); //假如我们是第一次注册服务,那么这里从缓存中得到的就是null if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); //实例化service service = new Service(); //设置服务名 service.setName(serviceName); //设置服务归属的命名空间 service.setNamespaceId(namespaceId); //设置服务归属的组 service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown //设置服务的最后修改时间为当前系统时间 service.setLastModifiedMillis(System.currentTimeMillis()); //重新计算服务的MD5值 service.recalculateChecksum(); //由于上一步传入进来的cluster等于null,所以此处不执行 if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } //验证服务是否合法,比如服务名 service.validate(); //添加服务并且初始化服务 putServiceAndInit(service); //local等于true,临时实例,以下代码不执行 if (!local) { addOrReplaceService(service); } } }
|
addInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { //根据namespaceId、serviceName以及是否是临时实例,创建一个key,由于此时我们注册的是临时实例,所以key为com.alibaba.nacos.naming.iplist.ephemeral.{namespaceId}.##.{serviceName} String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //从缓存中获取上一步中创建的空service Service service = getService(namespaceId, serviceName); synchronized (service) { //更新实例并返回该服务下的所有实例列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); //创建一个instances对象,用于传递实例列表 Instances instances = new Instances(); instances.setInstanceList(instanceList); //触发一个服务变更的通知以及向集群中的其他nacos节点同步实例数据 consistencyService.put(key, instances); } }
|
for (Instance instance : ips) {
//首次注册,clusterMap中不包含当前实例的clusterName的key
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
//实例化cluster,并通过构造函数传递集群的名字,以及当前集群归属于哪一个service
Cluster cluster = new Cluster(instance.getClusterName(), service);
//集群初始化,内部还是开启了一个心跳检查的定时任务,先不管
cluster.init();
//向service内部的clusterMap中设置cluster
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn(“cluster: {} not found, ip: {}, will create new cluster with default configuration.”,
instance.getClusterName(), instance.toJson());
}
init
注册了一个notifier的定时任务,Notifier
实现了Runnable
接口,
它的run
方法:如果ArrayBlockingQueue中没有数据,take方法会阻塞,直到从队列中取到数据,handle(pair);
它的handle
方法:遍历listeners,if (action == DataOperation.CHANGE) {
//调用listener的onChange方法
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
onChange:updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| public void updateIPs(Collection<Instance> instances, boolean ephemeral) { //创建一个空的map,用户存储实例数据 Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); //遍历clusterMap,copyOnWrite,考虑并发读写问题 for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } //遍历待添加的实例列表 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } //如果实例归属的集群名等于空 if (StringUtils.isEmpty(instance.getClusterName())) { //则使用默认的集群名DEFAULT instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } //如果服务的clusterMap中不包含clusterName key,由于在前几步中已经添加过了 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); //实例化cluster Cluster cluster = new Cluster(instance.getClusterName(), this); //初始化 cluster.init(); //添加到clusterMap中 getClusterMap().put(instance.getClusterName(), cluster); } //首次注册,等于null List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } //将实例添加到对应的cluster中 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } //遍历ipMap,并将新注册的实例添加到clusterMap中 for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //请自行跟踪updateIps方法,下文的问题2的解答在此处 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } //更新service的最后修改时间为当前系统时间 setLastModifiedMillis(System.currentTimeMillis()); //底层通过UDP socket 向所有订阅了该服务的客户端推送服务,不在主线中,先不管 getPushService().serviceChanged(this); //v2版本新增 ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral); StringBuilder stringBuilder = new StringBuilder(); //遍历所有实例,包含临时和持久化实例,输出日志 for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(','); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
|