In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to configure the center with nacos in Java". In the daily operation, I believe many people have doubts about how to configure the center with nacos in Java. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts about "how to configure the center with nacos in Java". Next, please follow the editor to study!
Configured publications and subscriptions
Let's first take a look at how to use api provided by nacos to implement a configured publish and subscribe publish configuration: public class ConfigPub {
Public static void main (String [] args) throws NacosException {
Final String dataId= "test"
Final String group= "DEFAULT_GROUP"
ConfigService configService= NacosFactory.createConfigService ("localhost:8848")
ConfigService.publishConfig (dataId,group, "test config body")
}
} subscription configuration: public static void main (String [] args) throws NacosException, InterruptedException {
Final String dataId= "test"
Final String group= "DEFAULT_GROUP"
ConfigService configService= NacosFactory.createConfigService ("localhost:8848")
ConfigService.addListener (dataId, group, new Listener () {
@ Override
Public Executor getExecutor () {
Return null
}
@ Override
Public void receiveConfigInfo (String configInfo) {
System.out.println ("receiveConfigInfo:" + configInfo)
}
});
Thread.sleep (Integer.MAX_VALUE)
}
According to the demo above, you can see that a configuration file can be located through dataId and group.
Learn more about configuration publishing
1-the published configuration information invokes the specific service through the http request
Agent.httpPost (url, headers, params, encode, POST_TIMEOUT); Service class is ConfigController: persistService for processing configuration-related http requests
.insertOrUpdateTag (configInfo, tag, srcIp, srcUser, time, false)
EventDispatcher.fireEvent (
New ConfigDataChangeEvent (false, dataId, group, tenant, tag
Time.getTime ())
You can see that the released configuration is persisted first, and then a change notification is triggered.
Instead of analyzing persistence here, let's take a look at fireEvent:
EventDispatcher.fireEvent:
Static public void fireEvent (Event event) {
If (null = = event) {
Throw new IllegalArgumentException ("event is null")
}
For (AbstractEventListener listener: getEntry (event.getClass ()) .listeners) {
Try {
Listener.onEvent (event)
} catch (Exception e) {
Log.error (e.toString (), e)
}
}
}
Here you can see that listener.onEvent (event) is specifically called.
All you have to do here is to find out which of the specific implementation classes of AbstractEventListener is.
AbstractEventListener has two main implementation classes:
AsyncNotifyService
LongPollingService
We can tell by the type of event, because here the parameter type of onEvent is ConfigDataChangeEvent
So we can clearly know that the implementation class we are looking for is AsyncNotifyService.
Each AbstractEventListener will first add itself to the listeners when initializing.
Final CopyOnWriteArrayList listeners
Public AbstractEventListener () {
/ * *
* automatic register
, /
EventDispatcher.addEventListener (this)
}
We can look directly at AsyncNotifyService's onEvent method:
Public void onEvent (Event event) {
/ / concurrent generation of ConfigDataChangeEvent
If (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event
Long dumpTs = evt.lastModifiedTs
String dataId = evt.dataId
String group = evt.group
String tenant = evt.tenant
String tag = evt.tag
/ / Member {address='192.168.31.192:8848'}
Collection ipList = memberManager.allMembers ()
/ / in fact, any type of queue here is fine.
Queue queue = new LinkedList ()
For (Member member: ipList) {
Queue.add (dataId, group, tenant, tag, dumpTs)
Member.getAddress (), evt.isBeta))
}
EXECUTOR.execute (new AsyncTask (httpclient, queue))
}
}
The above method mainly implements:
Get all the nacos service nodes and execute the asynchronous task AsyncTask on them.
AsyncTask will get the NotifySingleTask information of each node from the queue, then make a http request and call to notify the configuration information change.
The service. The specific services are implemented in CommunicationController.
/ * *
* notify configuration information changes
, /
@ GetMapping ("/ dataChange")
This method is analyzed later.
Learn more about configuring subscriptions
Initialize:
When NacosConfigService initializes, ClientWorker is constructed and two thread pools are started through ClientWorker.
Worker = new ClientWorker (agent, configFilterChainManager, properties)
The first thread pool executes checkConfigInfo () per 10ms
Executor.scheduleWithFixedDelay (new Runnable () {
@ Override
Public void run () {
Try {
CheckConfigInfo ()
} catch (Throwable e) {
LOGGER.error ("[" + agent.getName () + "] [sub-check] rotate check
Error ", e)
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS)
Let's see what exactly checkConfigInfo does.
Public void checkConfigInfo () {
/ / divide the task
Int listenerSize = cacheMap.get () .size ()
/ / rounded up to the number of batches, limiting the number of LongPollingRunnable processing configurations.
Int longingTaskCount = (int) Math.ceil (listenerSize / ParamUtil.getPerTaskConfigSize ()
If (longingTaskCount > currentLongingTaskCount) {
For (int I = (int) currentLongingTaskCount; I < longingTaskCount; iTunes +) {
/ / you need to think carefully to determine whether the task is executing or not.
/ / the task column appears to be unordered. There may be problems with the process of change.
ExecutorService.execute (new LongPollingRunnable (I))
/ / the I here stands for taskId
}
CurrentLongingTaskCount = longingTaskCount
}
}
The main role here is to submit the LongPollingRunnable task to the second thread pool to run.
And only 3000 configurations are processed per LongPollingRunnable.
Let's take a look at the implementation of LongPollingRunnable
List cacheDatas = new ArrayList ()
List inInitializingCacheList = new ArrayList ()
Try {
/ / check failover config
For (CacheData cacheData: cacheMap.get (). Values ()) {
If (cacheData.getTaskId ()) = = taskId) {
CacheDatas.add (cacheData)
...
}
}
The configuration information is saved in cacheMap, which is loaded and acquired from disk.
Get the configuration that needs to be processed by the current LongPollingRunnable task from the cacheMap through taskId and put it into the cacheDatas collection.
Let's see where the taskId is set.
Int taskId = cacheMap.get () .size () / (int) ParamUtil.getPerTaskConfigSize ()
Cache.setTaskId (taskId)
You can see that corresponding to the above, every 3000 configured taskId is the same. Because each LongPollingRunnable thread handles
3000 configurations. / / check server config requests the changed configuration from the server
List changedGroupKeys = checkUpdateDataIds (cacheDatas, inInitializingCacheList)
/ / get the DataID list with changed values from Server. Only dataId and group are valid among the returned objects. Guarantee that no NULL is returned.
Return checkUpdateConfigStr (sb.toString (), isInitializingCacheList)
Here, the client that subscribes to the configuration will send a http long polling request to the server to obtain the changed configuration information.
The long polling request will not return the result immediately, but will return when there is a configuration change. The timeout time is set to 30s, if it exceeds the set
If no update is configured for the timeout, it will be returned by default. Then re-initiate a request for a long poll.
HttpResult result = agent.httpPost (Constants.CONFIG_CONTROLLER_PATH + "/ listener")
Headers, params
Agent.getEncode (), readTimeoutMs)
The period of long polling defaults to 30s:
Timeout=Math.max (NumberUtils.toInt (properties.getProperty (PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT))
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT)
The specific service implementation class is in ConfigController:
@ PostMapping ("/ listener")
@ Secured (action = ActionTypes.READ, parser = ConfigResourceParser.class)
Public void listener (HttpServletRequest request, HttpServletResponse response)
Throws ServletException, IOException {
....
/ / do long-polling
Inner.doPollingConfig (request, response, clientMd5Map, probeModify.length ())
}
DoPollingConfig method:
/ / the server handles long polling requests
If (LongPollingService.isSupportLongPolling (request)) {
LongPollingService.addLongPollingClient (request, response, clientMd5Map
ProbeRequestSize)
Return HttpServletResponse.SC_OK + ""
}
Use a thread pool to process requests:
Scheduler.execute (
New ClientLongPolling (asyncContext, clientMd5Map, ip, probeRequestSize, timeout
AppName, tag))
Next, ClientLongPolling is a threaded implementation class.
First a deferred task is triggered, and then you add yourself to the queue: allSubs.add (this)
All long rotation training requests are maintained in allSubs.
Then there must be a place to consume requests in the allSubs queue.
This place of consumption is the onEvent method:
LongPollingService is actually the AbstractEventListener we mentioned above, so it also implements the onEvent method.
@ Override
Public void onEvent (Event event) {
If (isFixedPolling ()) {
/ / ignore
} else {
If (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event
Scheduler.execute (new DataChangeTask (evt.groupKey, evt.isBeta)
Evt.betaIps))
}
}
}
This event method is to deal with configuration changes, and the main logic is in DataChangeTask:
Get the request of the same dataId+group in the maintained request from allSubs, for example: (test+DEFAULT_GROUP)
The request for the long poll is then returned.
For (Iterator iter = allSubs.iterator (); iter.hasNext ();) {
ClientLongPolling clientSub = iter.next ()
/ / groupKey test+DEFAULT_GROUP
If (clientSub.clientMd5Map.containsKey (groupKey)) {
.
Iter.remove (); / / Delete subscription relationship
LogUtil.clientLog.info ("{} | {} | {}"
(System.currentTimeMillis ()-changeTime)
"in-advance"
RequestUtil.getRemoteIp ((HttpServletRequest) clientSub.asyncContext.getRequest ())
"polling"
ClientSub.clientMd5Map.size (), clientSub.probeRequestSize, groupKey)
ClientSub.sendResponse (Arrays.asList (groupKey))
}
} where did the onEvent method in LongPollingService be triggered?
Of course, it was triggered after the configuration was released. Remember the dataChange service in CommunicationController?
After the configuration is published, the dataChange service in the nacos service is invoked through a http request. Notification can be made through the dataChange service
Long rotation training requests saved in the nacos service.
And this method gets all the nacos service nodes to traverse the execution, so no matter which node the long polling corresponding to the change configuration is saved
Will be accessible.
/ * *
* notify configuration information changes
, /
@ GetMapping ("/ dataChange")
Here, the method in DumpService is called to save the configuration file to disk and cache md5. Exe.
DiskUtil.saveToDisk (dataId, group, tenant, content)
Public static void updateMd5 (String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure (groupKey)
If (cache.md5 = = null | |! cache.md5.equals (md5)) {
Cache.md5 = md5
Cache.lastModifiedTs = lastModifiedTs
EventDispatcher.fireEvent (new LocalDataChangeEvent (groupKey))
}
}
You can see that when the configuration changes, the LocalDataChangeEvent event of fireEvent is triggered.
At this point, the study on "how to configure the center with nacos in Java" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.