Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the reading method of Nacos source code?

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the relevant knowledge of "what is the Nacos source code reading method". The editor shows you the operation process through an actual case. The operation method is simple, fast and practical. I hope this article "what is the Nacos source code reading method" can help you solve the problem.

First of all, I would like to present you with a high-definition source code map I combed, so that you can have an overall understanding of the source code of nacos.

With this picture, it's easy to look at the nacos source code.

How to find a starting point

First of all, we have to find a pointcut to get into the nacos source code, so start with nacos dependencies

Com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery

If you enter the dependency file, you will find that it depends on another component:

Com.alibaba.cloud spring-cloud-alibaba-nacos-discovery

When we get into dependency, we find that it looks like this:

From this diagram, we find a familiar configuration file spring.factories, which is a necessary file for sringboot automatic assembly.

Org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration \ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

Since this one is mainly about the source code of service registration, we can just NacosServiceRegistryAutoConfiguration the auto-assembly files.

Public class NacosServiceRegistryAutoConfiguration {@ Bean public NacosServiceRegistry nacosServiceRegistry (NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry (nacosDiscoveryProperties);} @ ConditionalOnBean (AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration (NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {return new NacosRegistration (nacosDiscoveryProperties, context) Public NacosAutoServiceRegistration nacosAutoServiceRegistration (NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {return new NacosAutoServiceRegistration (registry, autoServiceRegistrationProperties, registration);}

What we see are three bean injections. Here is a tip to look at the source code: for the bean class declared in the auto-assembly file, we only need to look at the bean with auto, which is often the entrance; NacosAutoServiceRegistration with auto, let's click into it to see what's in it:

@ Override protected void register () {if (! this.registration.getNacosDiscoveryProperties () .isRegisterEnabled ()) {log.debug ("Registration disabled."); return;} if (this.registration.getPort ()

< 0) { this.registration.setPort(getPort().get()); } super.register(); } 里面有一个register()方法,我在这里打个断点,因为我猜测这个就是注册的入口,我现在使用debug模式,启动一个服务,看它会不会调用这个方法: 客户端注册 这里贴上我debug后,进入register方法的调用链截图

See the call chain, see a callback method of onApplicationEvent, and find the class AbstractAutoServiceRegistration in which the method is located

This class inherits ApplicationListener, a multicast listener. After spring starts, it issues a multicast event, and then calls back the onApplicationEvent method that implements the multicast component. Let's start with this method:

Public void onApplicationEvent (WebServerInitializedEvent event) {bind (event); / / bind the port and start} @ Deprecatedpublic void bind (WebServerInitializedEvent event) {/ / set the port this.port.compareAndSet (0, event.getWebServer () .getPort ()); / / start the client registration component this.start () } public void start () {/ / omit branch code / / call registration register () }

Because springcloud provides a variety of registry extensions, but we only refer to the nacos registry here, the register method of NacosServiceRegistry is called directly here:

Public void register (Registration registration) {/ / omit branch code / / get service id String serviceId = registration.getServiceId (); / / get group configuration String group = nacosDiscoveryProperties.getGroup (); / / encapsulate service instance Instance instance = getNacosInstanceFromRegistration (registration) / / call the registerInstance method of naming service to register the instance namingService.registerInstance (serviceId, group, instance);}

Go to the registerInstance method

Public void registerInstance (String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral ()) {/ / omit branch code / / establish a heartbeat with the server. By default, this.beatReactor.addBeatInfo (NamingUtils.getGroupedName (serviceName, groupName), beatInfo) is sent every 5 seconds. } / / send the registration request this.serverProxy.registerService (NamingUtils.getGroupedName (serviceName, groupName), groupName, instance) to the server through http;}

Serverproxy sends a request to the server interface ("/ nacos/v1/ns/instance") by calling the reapi method that encapsulates the http

Public void registerService (String serviceName, String groupName, Instance instance) throws NacosException {LogUtils.NAMING_LOGGER.info ("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object [] {this.namespaceId, serviceName, instance}); Map params = new HashMap (9); params.put ("namespaceId", this.namespaceId); params.put ("serviceName", serviceName); params.put ("groupName", groupName) Params.put ("clusterName", instance.getClusterName ()); params.put ("ip", instance.getIp ()); params.put ("port", String.valueOf (instance.getPort (); params.put ("weight", String.valueOf (instance.getWeight ()); params.put ("enable", String.valueOf (instance.isEnabled () Params.put ("healthy", String.valueOf (instance.isHealthy (); params.put ("ephemeral", String.valueOf (instance.isEphemeral (); params.put ("metadata", JSON.toJSONString (instance.getMetadata (); this.reqAPI (UtilAndComs.NACOS_URL_INSTANCE, params, (String) "POST");}

We know that nacos is often deployed in the form of clusters, so how does the client choose one of the nodes to send? it must realize the logic of load balancing. let's click reqAPI to see how it is implemented.

If (servers! = null & &! servers.isEmpty ()) {Random random = new Random (System.currentTimeMillis ()); / / get an index randomly. Servers holds all nacos node addresses int index = random.nextInt (servers.size ()). / / traverse all nodes, find the server of the corresponding location from the servers according to the index value, and make the request call. If the call is successful, it will return, otherwise it will traverse in turn until the request succeeds for (int I = 0; I).

< servers.size(); ++i) { String server = (String)servers.get(index); try { return this.callServer(api, params, server, method); } catch (NacosException var11) { exception = var11; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11); } catch (Exception var12) { exception = var12; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12); } // index+1 然后取模 是保证index不会越界 index = (index + 1) % servers.size(); } throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage()); } 到这里,客户端注册的代码已经分析完了,不过这还不是本篇的结束,我们还得继续分析服务端是如何处理客户端发送过来的注册请求: 服务端处理客户端注册请求 如果需要查看服务端源码的话,则需要将nacos源码下下来 下载地址 我们从服务注册api接口地址(/nacos/v1/ns/instance),可以找到对应的controller为(com.alibaba.nacos.naming.controllers.InstanceController) 因为注册实例发送的是post请求,所以直接找被postmapping注解的register方法 @CanDistro @PostMapping public String register(HttpServletRequest request) throws Exception {// 获取服务名 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 获取命名空间id String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 注册实例serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; } 我们点击进入到registerInstance方法: public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); }// 执行添加实例的操作 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }分析 在nacos中,注册实例后,还需要将注册信息同步到其他节点,所有在nacos中存在两种同步模式AP和CP,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上; nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。 com.alibaba.nacos.naming.core.ServiceManager.addInstance() public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 生成服务的key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 获取服务 Service service = getService(namespaceId, serviceName); // 使用同步锁处理 synchronized (service) { List instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 调用consistencyService.put 处理同步过来的服务 consistencyService.put(key, instances); } } 我们在进入到consistencyService.put方法中

When you click on the put method, you will see that there are three implementation classes. From the context (or debug mode), it can be inferred that the DelegateConsistencyServiceImpl implementation class is referenced here.

@ Override public void put (String key, Record value) throws NacosException {/ / after entering the put method, you can know whether to use ap or cp to synchronize mapConsistencyService (key) .put (key, value);}

From the following method, you can judge whether to use ap or cp to synchronize registration information through key, where key is composed of ephemeral fields

Private ConsistencyService mapConsistencyService (String key) {return KeyBuilder.matchEphemeralKey (key)? EphemeralConsistencyService: persistentConsistencyService;}

AP synchronized process (ephemeralConsistencyService) Local server processes registration information & synchronizes registration information to other nodes

@ Override public void put (String key, Record value) throws NacosException {/ / process local registration list onPut (key, value); / / add blocking tasks, synchronize information to other cluster nodes taskDispatcher.addTask (key);} process local registration nodes

Nacos adds key as a task to the blocking queue tasks in notifer and executes using a single thread, where notifer is initialized and is placed in the thread pool as a thread (only one core thread is set in the thread pool)

Here is a point to tell you: in most distributed frameworks, single-threaded blocking queues are used to deal with time-consuming tasks, which can solve the concurrency problem on the one hand and write conflicts caused by concurrency on the other.

The main processing logic in the thread is to loop through the contents of the blocking queue, then process the registration information and update it to the memory registration list.

Synchronize registration information to other cluster nodes

Nacos also stores the registered key as a task in the taskShedule blocking queue in TaskDispatcher, and then starts the thread loop to read the blocking queue:

@ Override public void run () {List keys = new ArrayList (); while (true) {String key = queue.poll (partitionConfig.getTaskDispatchPeriod (), TimeUnit.MILLISECONDS); / / omit judgment code / / add synchronous key keys.add (key) / / count dataSize++ / / determine whether the key size of synchronization is equal to the limit set by batch synchronization or whether the last synchronization time is greater than the configured interval period, if any one is satisfied. Then start synchronizing if (dataSize = = partitionConfig.getBatchSyncKeyCount () | | (System.currentTimeMillis ()-lastDispatchTime) > partitionConfig.getTaskDispatchPeriod ()) {/ / traversing all cluster nodes Call http directly to synchronize for (Server member: dataSyncer.getServers ()) {if (NetUtils.localServer (). Equals (member.getKey () {continue } SyncTask syncTask = new SyncTask (); syncTask.setKeys (keys); syncTask.setTargetServer (member.getKey ()) If (Loggers.DISTRO.isDebugEnabled () & & StringUtils.isNotBlank (key)) {Loggers.DISTRO.debug ("add sync task: {}", JSON.toJSONString (syncTask));} dataSyncer.submit (syncTask, 0) } / / record the synchronization time lastDispatchTime = System.currentTimeMillis (); / / count zero dataSize = 0;}}

The process of using ap for synchronization is simple, but there are two design ideas to solve the problem of single key synchronization:

If a new key is pushed up, the nacos initiates a synchronization, which results in a waste of network resources, because only one key or several key are synchronized at a time.

Synchronize a small number of key solutions: only when a specified number of key is accumulated before the batch synchronization exceeds the configured limit, ignore the number of key and directly initiate the process of synchronization in CP mode (RaftConsistencyServiceImpl)

Cp mode pursues data consistency. For data consistency, it is necessary to select a leader, which is synchronized by leader first, and then leader notifies follower to obtain the latest registration node (or actively push it to follower).

Nacos uses the raft protocol to elect leader to implement the cp pattern.

Also enter the put method of RaftConsistencyServiceImpl

@ Override public void put (String key, Record value) throws NacosException {try {raftCore.signalPublish (key, value);} catch (Exception e) {Loggers.RAFT.error ("Raft put failed.", e); throw new NacosException (NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);}}

Going into the raftCore.signalPublish method, I extracted a few key pieces of code

/ / first determine whether the current nacos node is leader, if not leader, get the ip of the leader node, and then forward the request to leader for processing, otherwise go down to if (! isLeader ()) {JSONObject params = new JSONObject (); params.put ("key", key); params.put ("value", value); Map parameters = new HashMap (1) Parameters.put ("key", key); raftProxy.proxyPostLarge (getLeader (). Ip, API_PUB, params.toJSONString (), parameters); return;}

Also use the same queue to deal with the local registration list

OnPublish (datum, peers.local ()); public void onPublish (Datum datum, RaftPeer source) throws Exception {/ / add synchronous key tasks to the blocking queue notifier.addTask (datum.key, ApplyAction.CHANGE); Loggers.RAFT.info ("data added/updated, key= {}, term= {}", datum.key, local.term);}

Traverses all cluster nodes and sends http synchronization requests

For (final String server: peers.allServersIncludeMyself ()) {/ / if it is leader, do not synchronize if (isLeader (server)) {latch.countDown (); continue } / / Assembly url sends synchronization requests to other cluster nodes final String url = buildURL (server, API_ON_PUB) HttpClient.asyncHttpPostLarge (url, Arrays.asList ("key=" + key), content, new AsyncCompletionHandler () {@ Override public Integer onCompleted (Response response) throws Exception {if (response.getStatusCode ()! = HttpURLConnection.HTTP_OK) {Loggers.RAFT.warn ("[RAFT] failed to publish data to peer, datumId= {}) Peer= {}, http code= {} ", datum.key, server, response.getStatusCode () Return 1;} latch.countDown (); return 0;} @ Override public STATE onContentWriteCompleted () {return STATE.CONTINUE This is the end of the content about "what is the method of reading Nacos source code?". Thank you for your reading. If you want to know more about the industry, you can follow the industry information channel. The editor will update different knowledge points for you every day.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report