In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "how to solve the restart caused by zk in flink". In the daily operation, I believe that many people have doubts about how to solve the problem caused by zk in flink. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubt of "how to solve the restart caused by zk in flink". Next, please follow the editor to study!
Background
Recently, in the process of running the program with flink on K8s, it is found that a certain moment often causes the program to restart, the scheduled task loads the cache once a day, the cache contains a large amount of data, and the loading time is about 60-90s. This scheduled task often causes K8s to restart the program, making it extremely unstable, so various tuning.
Memory dependent
It is suspected that some communication between the operator's sender and receiver may be unreachable due to loading the cache, and the default heartbeat time is 50s, so modify the parameter: heartbeat.timeout: 18000 ~ ~ heartbeat.interval: 20000.
Jobmanager and taskmanager communicate with akka, modifying the parameter akka.ask.timeout: 240s.
After these operations, exceptions are occasionally found when the cache is loaded, and the log interception is as follows
2020-10-16 17 have not heard from server in 29068ms for sessionid 0x30135fa8005449f2020 05 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn-Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f 05939 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn-Client session timed out Closing socket connection and attempting reconnect2020-10-16 17 State change 05V 07609 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager-State change: SUSPENDED2020-10-16 17 V 05V 07611 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService-Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.2020-10-16 17 JobManager for job 1bb3b7bdcfbc39cf760064ed9736ea80 with leader id bed26e07640e5e79197e468c85354534 lost leadership.2020 0515 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService-Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.2020-10-16 17 Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80.2020 05V 07614 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor-Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80.2020-10-16 17 V 05V 07615 INFO org.apache.flink.runtime.taskmanager.Task-Attempting to fail task externally Source: Custom Source-> Flat Map-> Timestamps/Watermarks (15 INFO org.apache.flink.runtime.taskmanager.Task 15) (052a84a37a0647ab485baa54f149b762). 2020-10-16 17 Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80.2020 05V 07615 INFO org.apache.flink.runtime.taskmanager.Task-Source: Custom Source-> Flat Map-> Timestamps/Watermarks (15amp 15) (052a84a37a0647ab485baa54f149b762) switched from RUNNING to FAILED.org.apache.flink.util.FlinkException: JobManager responsible for 1bb3b7bdcfbc39cf760064ed9736ea80 lost the leadership.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection (TaskExecutor.java:1274) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200 (TaskExecutor.java:155) at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1 (TaskExecutor.java:1698) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync (AkkaRpcActor) At org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage (AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage (AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply (CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply (CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse (PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse (CaseStatements.scala:21) at scala.PartialFunction$OrElse. ApplyOrElse (PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse (PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse (PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive (Actor.scala:517) at akka.actor.AbstractActor.aroundReceive (AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage (ActorCell.scala:592) at akka.actor.ActorCell.invoke (ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox (Mailbox.scala:258) at akka.dispatch.Mailbox.run (Mailbox. Scala:225) at akka.dispatch.Mailbox.exec (Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask (ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Job leader for job id 1bb3b7bdcfbc39cf760064ed9736ea80 lost leadership.... 22 more
After investigation, it is found that this has something to do with zk. When zk switches leader or encounters network fluctuations, it will trigger SUSPENDED status, which will lead to lost the leadership errors. When K8s encounters this error, K8s will restart the program directly. In fact, it is normal to visit zk. After a series of investigations, this problem has long been encountered by others and changed the code, that is, the flink officially did not merge the code. The process of the survey is not shown, and the useful links are as follows
Https://www.cnblogs.com/029zz010buct/p/10946244.html
This useful is to upgrade the curator package, flink uses 2.12.0, not to operate for the time being, the SessionConnectionStateErrorPolicy mentioned in the 4.x version, should still have to compile part of the code.
Https://github.com/apache/flink/pull/9066 https://issues.apache.org/jira/browse/FLINK-10052
This is someone else's solution, and I use the same method. Do not regard the SUSPENDED state as lost leadership, modify the handleStateChange method of LeaderLatch
Case RECONNECTED: {try {if (! hasLeadership.get ()) {reset () }} catch (Exception e) {ThreadUtils.checkInterrupted (e); log.error ("Could not reset leader latch", e); setLeadership (false);} break } case LOST: {setLeadership (false); break;} compile flink-shaded-hadoop-2-uber
Find this code, naturally found the flink-shaded-hadoop-2-uber-xxx.jar package, in the flink1.10 version, but also support the hadoop package, after 1.11 no longer take the initiative to support, need to download their own, because this package will be deliberately added to the mirror, so the target lock this package, recompile. Briefly talk about the compilation process.
Https://github.com/apache/curator/tree/apache-curator-2.12.0 downloads the source code of this version and modifies the src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java under curator-recipes. The modification is shown above, and the package is 2.12.0.
Https://github.com/apache/flink-shaded/tree/release-10.0 downloads the source code of flink-shaded version 1.10, modifies the pom file of flink-shaded-hadoop-2-parent, adds exclusion, removes the dependence of curator-recipes, and increases the curator-recipes compiled by itself. It is observed that the dependency is not removed, the default is version 2.7.1, it should be that this piece of code has not been moved for many years, and the version has been stuck at 2.7.1.
Org.apache.hadoop hadoop-common ${hadoop.version}... Omit some exclusion org.apache.curator curator-recipes Org.apache.curator curator-recipes 2.12.0
Because we are using version 2.8.3-10.0, the source code is 2.4.1, which is modified to 2.8.3.
Look at the readme.md of the root directory, run mvn package-Dshade-sources packaging in the flink-shaded-release-10.0/flink-shaded-hadoop-2-parent directory, and after the package is complete, use the tool to decompile and observe that the SUSPENDED code is indeed removed, re-mirror, and run the program.
At this point, the study on "how to solve the restart caused by zk in flink" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.