In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What is the problem and solution of Flink checkpoint failure? many novices are not very clear about this. In order to help you solve this problem, the following editor will explain it in detail. People with this need can come and learn. I hope you can get something.
I have been in contact with Flink for a period of time, and I have encountered some problems, including a problem of job restart caused by checkpoint failure, which has been encountered many times. It generally returns to normal after restart, and I don't pay much attention to it. Some colleagues have encountered it frequently in the past 2 days. Here is a record of the solution and analysis process.
Our flink test environment has three nodes. The deployment architecture is to deploy a DataNode node of HDFS on each flink node. Hdfs is used for checkpoint and savepoint of flink.
Phenomenon
Looking at the log, it means that there are three datanode alive, and the copy of the file is 1, but failed to write the file.
There are 3 datanode (s) running and no node (s) are excluded
Copy the code
I searched the Internet for this kind of error report, but there was no direct answer. I looked at the namenode log and there was no more direct information.
Everything looks normal on 50070 web ui. There is still a lot of space left in datanode, with a utilization rate of less than 10%.
I tried to put a file on hdfs and then get it to ok, which shows that there is no problem with hdfs service, and datanode is also accessible.
Log phenomenon 1
I continued to look through namenode's log and noticed some warning information.
At this time, it is suspected that there is something wrong with the block placement strategy.
Follow the log prompts to turn on the corresponding debug switch
Modify
Etc/hadoop/log4j.properties
Copy the code
find
Log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
Copy the code
Copy this format and add it below
Log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy=DEBUG
Log4j.logger.org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor=DEBUG
Log4j.logger.org.apache.hadoop.net.NetworkTopology=DEBUG
Copy the code
Restart namenode and then rerun the flink job
Log phenomenon 2
The problem we see at this time is that the rack-aware policy cannot be satisfied, because we do not provide a rack mapping script, which defaults to the same rack, but if you think about it, it should have nothing to do with this.
Because rack mapping scripts are not configured for hdfs in many production environments, and the problems that lead to checkpoint failures do not always exist, at least the put/get files are normal.
At this time, we start to consider taking a look at the source code of hdfs. According to the above log call stack, we will first see BlockPlacementPolicyDefault and related DatanodeDescriptor.
This source code roughly means that when selecting a datanode for a block, do some checks on the datanode, such as looking at the remaining space to see how busy it is.
When our problem reappears, look at the log and find some key information related to it.
What this log means is that the storage space is 43G, and the allocation block actually needs more than 100m, but the scheduled size is more than 43G, so we think that the normal datanode,namenode thinks it is insufficient.
Reason
What does scheduled size mean? According to the code, you can see that the scheduled size is multiplied by a counter, which actually represents the number of new file blocks. Hdfs evaluates the storage space that may be needed according to these two parameters, which is equivalent to booking a certain amount of space for each datanode. The scheduled space will be adjusted back after the file is written and the real occupied space is calculated.
After understanding this principle, what can be judged is that datanode has been booked too much space for a period of time.
The checkpoint mechanism of flink can refer to this article www.jianshu.com/p/9c587bd49.
It roughly means that many task threads on taskmanager write hdfs.
Looking at the directory structure of hdfs, there are a large number of uuid named checkpoint files, each of which is very small.
When the concurrency of our jobs is large, more checkpoint files will be created on the hdfs. Although our file size is only a few K, the space booked in each datanode is 128m multiplied by the number of files allocated (files are very small, no more than 128m), so how many files can be booked for 43G space? Except for more than 300, three nodes is a maximum of 900, we have multiple jobs, the total concurrency is large, before the full release of the reserved space, it is very easy to have this problem.
It was previously known that hdfs is not suitable for storing small files because a large number of small files will lead to inode consumption and the growth of metadata such as block location, which makes namenode memory tight. This example also shows that
When the blocksize setting is large, but the file size is much smaller than blocksize, a large number of these small files will cause datanode to be directly "unavailable".
Solution.
The block size is not a cluster attribute, but a file attribute, which can be set by the client. At this time, each taskmanager and jobmanager of flink is a "client" of hdfs. According to the flink document, we can do the following configuration
1. Specify a configuration file path for hdfs in conf/flink-conf.yaml
Fs.hdfs.hadoopconf: / home/xxxx/flink/conf
Copy the code
Here, select the same directory as the configuration file path of flink
2. Put in two configuration files, one core-site.xml and one hdfs-site.xml
Core-site.xml can be left out if checkpoint and savepoint specify a specific hdfs address
Add blockSize configuration to hdfs-site.xml. For example, here we set it to 1m.
How to set the specific block size, you need to observe your own job status file size to adjust flexibly.
Restart the flink cluster and submit the job. You can observe the fsimage size of the hdfs at runtime. Be careful not to cause the metadata to be too large because the block is too small and there are too many small files.
We have synchronized this issue into the cluster automation deployment script, and the configuration of blocksize will be added specifically during deployment.
Flink's checkpoint solution, which relies on hdfs, is a little bloated for lightweight stream computing scenarios. Both direct filesystem and rocksDB of checkpoint distributed storage require hdfs. In fact, es should also be a good choice in terms of checkpoint principle and data type. Unfortunately, this solution is not provided by the community.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.