Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to configure Center with nacos in Java

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "how to configure the center with nacos in Java". In the daily operation, I believe many people have doubts about how to configure the center with nacos in Java. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts about "how to configure the center with nacos in Java". Next, please follow the editor to study!

Configured publications and subscriptions

Let's first take a look at how to use api provided by nacos to implement a configured publish and subscribe publish configuration: public class ConfigPub {

Public static void main (String [] args) throws NacosException {

Final String dataId= "test"

Final String group= "DEFAULT_GROUP"

ConfigService configService= NacosFactory.createConfigService ("localhost:8848")

ConfigService.publishConfig (dataId,group, "test config body")

}

} subscription configuration: public static void main (String [] args) throws NacosException, InterruptedException {

Final String dataId= "test"

Final String group= "DEFAULT_GROUP"

ConfigService configService= NacosFactory.createConfigService ("localhost:8848")

ConfigService.addListener (dataId, group, new Listener () {

@ Override

Public Executor getExecutor () {

Return null

}

@ Override

Public void receiveConfigInfo (String configInfo) {

System.out.println ("receiveConfigInfo:" + configInfo)

}

});

Thread.sleep (Integer.MAX_VALUE)

}

According to the demo above, you can see that a configuration file can be located through dataId and group.

Learn more about configuration publishing

1-the published configuration information invokes the specific service through the http request

Agent.httpPost (url, headers, params, encode, POST_TIMEOUT); Service class is ConfigController: persistService for processing configuration-related http requests

.insertOrUpdateTag (configInfo, tag, srcIp, srcUser, time, false)

EventDispatcher.fireEvent (

New ConfigDataChangeEvent (false, dataId, group, tenant, tag

Time.getTime ())

You can see that the released configuration is persisted first, and then a change notification is triggered.

Instead of analyzing persistence here, let's take a look at fireEvent:

EventDispatcher.fireEvent:

Static public void fireEvent (Event event) {

If (null = = event) {

Throw new IllegalArgumentException ("event is null")

}

For (AbstractEventListener listener: getEntry (event.getClass ()) .listeners) {

Try {

Listener.onEvent (event)

} catch (Exception e) {

Log.error (e.toString (), e)

}

}

}

Here you can see that listener.onEvent (event) is specifically called.

All you have to do here is to find out which of the specific implementation classes of AbstractEventListener is.

AbstractEventListener has two main implementation classes:

AsyncNotifyService

LongPollingService

We can tell by the type of event, because here the parameter type of onEvent is ConfigDataChangeEvent

So we can clearly know that the implementation class we are looking for is AsyncNotifyService.

Each AbstractEventListener will first add itself to the listeners when initializing.

Final CopyOnWriteArrayList listeners

Public AbstractEventListener () {

/ * *

* automatic register

, /

EventDispatcher.addEventListener (this)

}

We can look directly at AsyncNotifyService's onEvent method:

Public void onEvent (Event event) {

/ / concurrent generation of ConfigDataChangeEvent

If (event instanceof ConfigDataChangeEvent) {

ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event

Long dumpTs = evt.lastModifiedTs

String dataId = evt.dataId

String group = evt.group

String tenant = evt.tenant

String tag = evt.tag

/ / Member {address='192.168.31.192:8848'}

Collection ipList = memberManager.allMembers ()

/ / in fact, any type of queue here is fine.

Queue queue = new LinkedList ()

For (Member member: ipList) {

Queue.add (dataId, group, tenant, tag, dumpTs)

Member.getAddress (), evt.isBeta))

}

EXECUTOR.execute (new AsyncTask (httpclient, queue))

}

}

The above method mainly implements:

Get all the nacos service nodes and execute the asynchronous task AsyncTask on them.

AsyncTask will get the NotifySingleTask information of each node from the queue, then make a http request and call to notify the configuration information change.

The service. The specific services are implemented in CommunicationController.

/ * *

* notify configuration information changes

, /

@ GetMapping ("/ dataChange")

This method is analyzed later.

Learn more about configuring subscriptions

Initialize:

When NacosConfigService initializes, ClientWorker is constructed and two thread pools are started through ClientWorker.

Worker = new ClientWorker (agent, configFilterChainManager, properties)

The first thread pool executes checkConfigInfo () per 10ms

Executor.scheduleWithFixedDelay (new Runnable () {

@ Override

Public void run () {

Try {

CheckConfigInfo ()

} catch (Throwable e) {

LOGGER.error ("[" + agent.getName () + "] [sub-check] rotate check

Error ", e)

}

}

}, 1L, 10L, TimeUnit.MILLISECONDS)

Let's see what exactly checkConfigInfo does.

Public void checkConfigInfo () {

/ / divide the task

Int listenerSize = cacheMap.get () .size ()

/ / rounded up to the number of batches, limiting the number of LongPollingRunnable processing configurations.

Int longingTaskCount = (int) Math.ceil (listenerSize / ParamUtil.getPerTaskConfigSize ()

If (longingTaskCount > currentLongingTaskCount) {

For (int I = (int) currentLongingTaskCount; I < longingTaskCount; iTunes +) {

/ / you need to think carefully to determine whether the task is executing or not.

/ / the task column appears to be unordered. There may be problems with the process of change.

ExecutorService.execute (new LongPollingRunnable (I))

/ / the I here stands for taskId

}

CurrentLongingTaskCount = longingTaskCount

}

}

The main role here is to submit the LongPollingRunnable task to the second thread pool to run.

And only 3000 configurations are processed per LongPollingRunnable.

Let's take a look at the implementation of LongPollingRunnable

List cacheDatas = new ArrayList ()

List inInitializingCacheList = new ArrayList ()

Try {

/ / check failover config

For (CacheData cacheData: cacheMap.get (). Values ()) {

If (cacheData.getTaskId ()) = = taskId) {

CacheDatas.add (cacheData)

...

}

}

The configuration information is saved in cacheMap, which is loaded and acquired from disk.

Get the configuration that needs to be processed by the current LongPollingRunnable task from the cacheMap through taskId and put it into the cacheDatas collection.

Let's see where the taskId is set.

Int taskId = cacheMap.get () .size () / (int) ParamUtil.getPerTaskConfigSize ()

Cache.setTaskId (taskId)

You can see that corresponding to the above, every 3000 configured taskId is the same. Because each LongPollingRunnable thread handles

3000 configurations. / / check server config requests the changed configuration from the server

List changedGroupKeys = checkUpdateDataIds (cacheDatas, inInitializingCacheList)

/ / get the DataID list with changed values from Server. Only dataId and group are valid among the returned objects. Guarantee that no NULL is returned.

Return checkUpdateConfigStr (sb.toString (), isInitializingCacheList)

Here, the client that subscribes to the configuration will send a http long polling request to the server to obtain the changed configuration information.

The long polling request will not return the result immediately, but will return when there is a configuration change. The timeout time is set to 30s, if it exceeds the set

If no update is configured for the timeout, it will be returned by default. Then re-initiate a request for a long poll.

HttpResult result = agent.httpPost (Constants.CONFIG_CONTROLLER_PATH + "/ listener")

Headers, params

Agent.getEncode (), readTimeoutMs)

The period of long polling defaults to 30s:

Timeout=Math.max (NumberUtils.toInt (properties.getProperty (PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT))

Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT)

The specific service implementation class is in ConfigController:

@ PostMapping ("/ listener")

@ Secured (action = ActionTypes.READ, parser = ConfigResourceParser.class)

Public void listener (HttpServletRequest request, HttpServletResponse response)

Throws ServletException, IOException {

....

/ / do long-polling

Inner.doPollingConfig (request, response, clientMd5Map, probeModify.length ())

}

DoPollingConfig method:

/ / the server handles long polling requests

If (LongPollingService.isSupportLongPolling (request)) {

LongPollingService.addLongPollingClient (request, response, clientMd5Map

ProbeRequestSize)

Return HttpServletResponse.SC_OK + ""

}

Use a thread pool to process requests:

Scheduler.execute (

New ClientLongPolling (asyncContext, clientMd5Map, ip, probeRequestSize, timeout

AppName, tag))

Next, ClientLongPolling is a threaded implementation class.

First a deferred task is triggered, and then you add yourself to the queue: allSubs.add (this)

All long rotation training requests are maintained in allSubs.

Then there must be a place to consume requests in the allSubs queue.

This place of consumption is the onEvent method:

LongPollingService is actually the AbstractEventListener we mentioned above, so it also implements the onEvent method.

@ Override

Public void onEvent (Event event) {

If (isFixedPolling ()) {

/ / ignore

} else {

If (event instanceof LocalDataChangeEvent) {

LocalDataChangeEvent evt = (LocalDataChangeEvent) event

Scheduler.execute (new DataChangeTask (evt.groupKey, evt.isBeta)

Evt.betaIps))

}

}

}

This event method is to deal with configuration changes, and the main logic is in DataChangeTask:

Get the request of the same dataId+group in the maintained request from allSubs, for example: (test+DEFAULT_GROUP)

The request for the long poll is then returned.

For (Iterator iter = allSubs.iterator (); iter.hasNext ();) {

ClientLongPolling clientSub = iter.next ()

/ / groupKey test+DEFAULT_GROUP

If (clientSub.clientMd5Map.containsKey (groupKey)) {

.

Iter.remove (); / / Delete subscription relationship

LogUtil.clientLog.info ("{} | {} | {}"

(System.currentTimeMillis ()-changeTime)

"in-advance"

RequestUtil.getRemoteIp ((HttpServletRequest) clientSub.asyncContext.getRequest ())

"polling"

ClientSub.clientMd5Map.size (), clientSub.probeRequestSize, groupKey)

ClientSub.sendResponse (Arrays.asList (groupKey))

}

} where did the onEvent method in LongPollingService be triggered?

Of course, it was triggered after the configuration was released. Remember the dataChange service in CommunicationController?

After the configuration is published, the dataChange service in the nacos service is invoked through a http request. Notification can be made through the dataChange service

Long rotation training requests saved in the nacos service.

And this method gets all the nacos service nodes to traverse the execution, so no matter which node the long polling corresponding to the change configuration is saved

Will be accessible.

/ * *

* notify configuration information changes

, /

@ GetMapping ("/ dataChange")

Here, the method in DumpService is called to save the configuration file to disk and cache md5. Exe.

DiskUtil.saveToDisk (dataId, group, tenant, content)

Public static void updateMd5 (String groupKey, String md5, long lastModifiedTs) {

CacheItem cache = makeSure (groupKey)

If (cache.md5 = = null | |! cache.md5.equals (md5)) {

Cache.md5 = md5

Cache.lastModifiedTs = lastModifiedTs

EventDispatcher.fireEvent (new LocalDataChangeEvent (groupKey))

}

}

You can see that when the configuration changes, the LocalDataChangeEvent event of fireEvent is triggered.

At this point, the study on "how to configure the center with nacos in Java" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report