Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze that the KubernetesClientException resource version is too old

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to carry out the analysis of the old version of KubernetesClientException resources, I believe that many inexperienced people are at a loss about this. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Background

The company is currently scheduling based on K8s (based on io.fabric8:kubernetes-client:4.2.0). In the process of operation, the company has encountered the following problems:

DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager-WebSocket close received. Code: 1000, reason:DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager-Submitting reconnect task to the executor [reconnectReconnect | Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager-Scheduling reconnect task [reconnectAttempt | Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager-Connecting websocket. Io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a199-2020-11-17T06:39:13.874Z-[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06 17T06:39:13.874Z 39 17T06:39:13.874Z 13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager-WebSocket successfully opened WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.) io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127) at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage (WatchConnectionManager.java:254) [kubernetes-client-4.2.2.jar:?] at okhttp3.internal.ws.RealWebSocket.onReadMessage (RealWebSocket.java:323) [okhttp-3.12. 0. At okhttp3.internal.ws.WebSocketReader.readMessageFrame (WebSocketReader.java:219) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.WebSocketReader.processNextFrame (WebSocketReader.java:105) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.RealWebSocket.loopReader (RealWebSocket.java:274) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.RealWebSocket$2.onResponse (RealWebSocket.java:214) [okhttp- 3.12.0. At okhttp3.RealCall$AsyncCall.execute (RealCall.java:206) [okhttp-3.12.0.jar:?] at okhttp3.internal.NamedRunnable.run (NamedRunnable.java:32) [okhttp-3.12.0.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [?: 1.8.0 January 191] at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624) [?: 1.8. 08.191] at java.lang.Thread.run (Thread.java:748) [?: 1.8.0U191]

This problem alone is not a big deal, but the code is:

WatchConnection = kubernetesClient.pods () .withLabel (MERLION_TASK_LABEL, applicationId) / / .withResourceVersion (resourceVersion) .watch (new TaskPodsWatcher ())

Because we have commented out withResourceVersion (resourceVersion), (if not, the resourceVersion set in our code is too small), but we will still report too old resource version

Analysis.

Jump directly to WatchConnectionManager onClosed as follows:

@ Override public void onClosed (WebSocket webSocket, int code, String reason) {logger.debug ("WebSocket close received. Code: {}, reason: {} ", code, reason); if (forceClosed.get ()) {logger.debug (" Ignoring onClose for already closed/closing websocket "); return;} if (currentReconnectAttempt.get () > = reconnectLimit & & reconnectLimit > = 0) {closeEvent (new KubernetesClientException (" Connection unexpectedly closed ")); return;} scheduleReconnect ();}

The explanation for onclosed is

/ * * Invoked when both peers have indicated that no more messages will be transmitted and the * connection has been successfully released. No further calls to this listener will be made. * / public void onClosed (WebSocket webSocket, int code, String reason) {}

It indicates that the connect is released because there is no transmission of event for a long time, which causes the WebSocket to be closed (which is very likely to happen in the case of not many tasks), resulting in a reconnection operation scheduleReconnect, and this method calls runWatch ():

Executor.schedule (new NamedRunnable ("reconnectAttempt") {@ Override public void execute () {try {runWatch (); reconnectPending.set (false);} catch (Exception e) {/ / An unexpected error occurred and we didn't even get an onFailure callback. Logger.error ("Exception in reconnect", e); webSocketRef.set (null); closeEvent (new KubernetesClientException ("Unhandled exception in reconnect attempt", e)); close ();}, nextReconnectInterval (), TimeUnit.MILLISECONDS);}

In the runWatch () method, we call the

If (this.resourceVersion.get ()! = null) {httpUrlBuilder.addQueryParameter ("resourceVersion", this.resourceVersion.get ());}

The this.resourceVersion value is set in the public void onMessage (WebSocket webSocket, String message) method:

WatchEvent event = readWatchEvent (message); Object object = event.getObject (); if (object instanceof HasMetadata) {@ SuppressWarnings ("unchecked") T obj = (T) object; / / Dirty cast-should always be valid though resourceVersion.set (HasMetadata) obj). GetMetadata (). GetResourceVersion (); Watcher.Action action = Watcher.Action.valueOf (event.getType ()) Watcher.eventReceived (action, obj);} else if (object instanceof KubernetesResourceList) {@ SuppressWarnings ("unchecked") KubernetesResourceList list = (KubernetesResourceList) object; / / Dirty cast-should always be valid though resourceVersion.set (list.getMetadata (). GetResourceVersion ()); Watcher.Action action = Watcher.Action.valueOf (event.getType ()); List items = list.getItems () If (items! = null) {for (HasMetadata item: items) {watcher.eventReceived (action, (T) item);}

That is, if the last setting of resourceVersion exceeds the minimum resourceVersion retained by etc when reconnecting, a too old resource version error will be reported:

Solve

Through an online search for kubernetes-too-old-resource-version, the Kubernetes Client team memeber mentions:

Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch

In other words, we can implement it in SharedInformer api, but the watch mechanism cannot handle this situation, so we can implement it in SharedInformer. As of November 16, 2020, we have obtained the latest version of kubernetes-client, kubernetes-client:4.13.0, coded implementation:

Val sharedInformerFactory = kubernetesClient.informers () val podInformer = sharedInformerFactory .sharedIndexInformer for (classOf [Pod], classOf [PodList], new OperationContext (). WithNamespace ("test"), 30 * 1000L) podInformer.addEventHandler (new ResourceEventHandler [Pod] {override def onAdd (obj: Pod): Unit = {eventReceived (obj, "ADD")} override def onDelete (obj: Pod, deletedFinalStateUnknown: Boolean): Unit = {eventReceived (obj) "DELETE")} override def onUpdate (oldObj: Pod, newObj: Pod): Unit = {eventReceived (newObj, "UPDATE")} private def idShouldUpdate (pod: Pod): Boolean = {pod.getMetadata.getLabels.getOrDefault (MERLION_TASK_LABEL, ") = applicationId} private def eventReceived (pod: Pod) Action: String): Unit = {if (idShouldUpdate (pod)) {val podName = pod.getMetadata.getName logger.info (s "Received job pod update for pod named $podName, action ${action}") snapshotsStore.updatePod (pod)}}) sharedInformerFactory.startAllRegisteredInformers ()}

The SharedInformerFactory mechanism is the same as the K8s Informer mechanism, which can ensure the reliability of messages, the most important of which are ReflectorWatcher, Reflector and DefaultSharedIndexInformer. Let's briefly analyze:

Public DefaultSharedIndexInformer (Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) {this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; this.processor = new SharedProcessor (); this.indexer = new Cache (); DeltaFIFO fifo = new DeltaFIFO (Cache::metaNamespaceKeyFunc, this.indexer); this.controller = new Controller (apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners) ControllerThread = new Thread (controller::run, "informer-controller-" + apiTypeClass.getSimpleName ();}

In DefaultSharedIndexInformer, DeltaFIFO is used as the storage of event, and the call to this::handleDeltas is called when Controller is called as the parameter processFunc function of this.queue.pop, that is, this function consumes the event in fifo, as follows:

Private void processLoop () throws Exception {while (true) {try {this.queue.pop (this.processFunc);} catch (InterruptedException t) {log.error ("DefaultController#processLoop got interrupted {}", t.getMessage (), t); return;} catch (Exception) {log.error ("DefaultController#processLoop recovered from crashing {}", e.getMessage (), e); throw e }}

And queue is also a parameter of DeltaFIFO, that is to say, queue is fifo, and where does the data in fifo come from? In the controller::run function:

If (fullResyncPeriod > 0) {reflector = new Reflector (apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);} else {reflector = new Reflector (apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);} reflector.listAndWatch ()

The reflector.listAndWatch () method will be called, which performs a list-watch mechanism similar to K8s, as follows:

Public void listAndWatch () throws Exception {try {log.info ("Started ReflectorRunnable watch for {}", apiTypeClass); reListAndSync (); resyncExecutor.scheduleWithFixedDelay (this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS); startWatcher ();} catch (Exception exception) {store.isPopulated (false); throw new RejectedExecutionException ("Error while starting ReflectorRunnable watch", exception);}}

ReListAndSync pulls full amount of event data, and startWatcher performs watch to obtain incremental event data. What is this watch? As follows:

Watch.set (listerWatcher.watch (new ListOptionsBuilder (). WithWatch (Boolean.TRUE). WithResourceVersion (lastSyncResourceVersion.get ()). WithTimeoutSeconds (null). Build (), operationContext.getNamespace (), operationContext, watcher))

The watcher here is initialized in the constructor of reflector

Watcher = new ReflectorWatcher (store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync)

ReflectorWatcher inherits from Watcher, so there are corresponding eventReceived methods and onClose methods, as follows:

@ Override public void eventReceived (Action action, T resource) {if (action = = null) {final String errorMessage = String.format ("Unrecognized event% s", resource.getMetadata (). GetName ()); log.error (errorMessage); throw new KubernetesClientException (errorMessage);} log.info ("Event received {}", action.name ()) Switch (action) {case ERROR: final String errorMessage = String.format ("ERROR event for% s", resource.getMetadata (). GetName ()); log.error (errorMessage); throw new KubernetesClientException (errorMessage); case ADDED: store.add (resource); break; case MODIFIED: store.update (resource); break; case DELETED: store.delete (resource) Break;} lastSyncResourceVersion.set (resource.getMetadata (). GetResourceVersion ()); log.debug ("{} # Receiving resourceVersion {}", resource.getKind (), lastSyncResourceVersion.get ());} @ Override public void onClose (KubernetesClientException exception) {log.error ("Watch closing"); Optional.ofNullable (exception) .map (e-> {log.debug ("Exception received during watch", e); return exception ) .map (KubernetesClientException::getStatus) .map (Status::getCode) .filter (c-> c.equals (HttpURLConnection.HTTP_GONE)) .ifPresent (c-> onHttpGone.run ()); onClose.run ();}

In the eventReceived method, all messages are added to the store, that is, the queue of fifo. In the onClose method, we see that if HTTP_GONE, that is, too old resource version, will perform onHttpGone.run () and onClose.run (), and onHttpGone is the reListAndSync function of Reflector, and onClose is the startWatcher function of Reflector, that is, once the watcher is turned off, watch will be redone.

Be careful

In kubernetes-client:4.6.2, WatchConnectionManager onMessage treats HTTP_GONE differently, as follows:

If (status.getCode () = = HTTP_GONE) {logger.info ("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get ()); resourceVersion.set (null); scheduleReconnect ();} else {logger.error ("Error received: {}", status.toString ());}

Once HTTP_GONE, occurs, the resourceVersion will be set to null, that is, the latest event will be obtained, and it will be reconnected immediately, while in version 4.13.0 and 4.2.0, it will not be reconnected immediately, but will be left to the user to deal with.

After reading the above, do you know how to analyze the KubernetesClientException resource version that is too old? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report