In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to analyze the advantages and disadvantages of Spark Streaming, I believe that many inexperienced people do not know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Preface
Talking about people: in fact, it is about the advantages and disadvantages of Spark Streaming. The benefits are mainly from some large aspects, while the pit is described in some small details encountered in the actual scene.
Rose chapter
Rose is mainly about the advantages of Spark Streaming.
Code reuse of Rose
This is mainly due to the design of Spark and the comprehensiveness of the platform. The streaming code you write can be easily applied to batch and interactive processing on the Spark platform. Because they are all based on the RDD model, and the designers of Spark Streaming have done better encapsulation and compatibility. So I said that RDD is a very powerful box that can frame all kinds of scenes, which is the result of a high degree of abstraction and thinking.
Machine Learning of Rose
If you use Spark MLlib for model training. Congratulations, first of all, many algorithms already support Spark Streaming. For example, k-means supports the streaming data update model. Second, you can also directly import the offline calculated model load in Spark Streaming, and then do real-time Predict operations on the new data.
SQL support for Rose
Sql/dataframe/datasets can be used naturally in Spark Streaming. And the use of time window can greatly expand this use scenario, such as various system early warning and so on. Similar to Storm requires additional development and support.
Rose huff and puff and real-time effective control
Spark Streaming can well control the degree of real-time (hours, minutes, seconds). Extreme cases can be set to milliseconds.
An overview of roses
Spark Streaming can interact well with other components of Spark to get its support. At the same time, the rapid development of Spark biosphere can also benefit from it.
Thorn chapter
The thorn chapter is to describe some of the problems of Spark Streaming, which can effectively reduce the risk of use before making a selection.
The thorn of checkpoint
Checkpoint is a good recovery mechanism. However, the scheme is rough and written directly to the file system through the serialization mechanism, resulting in code changes and configuration changes that cannot take effect. The reality is that upgrades tend to be much more frequent than system crashes. But the upgrade needs to be able to seamlessly link up with the last offset. So when spark streaming cannot tolerate data loss, you need to record the offset yourself and recover it from the last time.
At present, we have rewritten the relevant code to record the offset each time, but we will only read the offset recorded by ourselves during the upgrade. In other cases, we still use the checkpoint mechanism.
The thorn of Kafka
This has something to do with Spark Streaming, and it's not very relevant. It is relevant because Spark is relatively simple to handle many exceptions. Many are related to Kafka configuration. Let me give you an example:
If the message body is too large to exceed fetch.message.max.bytes=1m, then Spark Streaming will directly throw an OffsetOutOfRangeException exception and then stop the service.
The corresponding error is thrown from this line of code:
If (! iter.hasNext) {assert (requestOffset = = part.untilOffset, errRanOutBeforeEnd (part)) finished = true null.asInstanceOf [R]}
In fact, the actual amount of consumption data after the completion of consumption is inconsistent with the pre-estimated amount.
The information you see in the log is actually promised by this code:
Private def errRanOutBeforeEnd (part: KafkaRDDPartition): String =
S "Ran out of messages before reaching ending offset ${part.untilOffset}" +
S "for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
"This should not happen, and indicates that messages may have been lost"
The solution, of course, is to set the fetch.message.max.bytes larger.
If you use Spark Streaming to track down data and consume kafka from scratch, and Kafka for some reason, the old data is quickly cleaned up, which will also cause OffsetOutOfRangeException errors. And causes the Spark Streaming program to terminate abnormally.
The solution is to record the relationship between kafka offset and time in advance (which can be recorded every few seconds), and then find a larger offset to start consumption based on time.
Or you can add a larger value to the offset obtained by smallest according to the current consumption rate of new data in Kafka, so as to avoid the situation that the data does not exist when Spark Streaming is in fetch.
The thorn of Kafka partition mapping RDD partition
The number of partitions in Kafka determines your parallelism (we assume you use Direct Approach's schema integration). In order to achieve a greater degree of parallelism, a repartition is needed, and repartition means that Shuffle needs to occur, which may consume our precious time in streaming computing.
In order to avoid Shuffle and improve the parallelism of Spark Streaming processing, we rewrite classes such as DirectKafkaInputDStream,KafkaRDD,KafkaUtils to implement the function that one Kafka partition can be mapped to multiple RDD partition. For example, if you have M Kafka partitions, you can map to M RDD partitions. Where N is a positive integer > 1.
We expect that the official will be able to map the partitions of one Kafka to the partitions of multiple Spark, so as to avoid multiple data movements caused by Shuffle.
TextFileStream
In fact, a lot of people should use textFileStream. Because it is very convenient to monitor the files under a folder on the HDFS and calculate them. One of the problems we encounter here is that if you encounter a bad file at the bottom, such as a compressed file, you will not be able to skip it, and you will directly let Spark Streaming exit abnormally. Officials do not provide an appropriate way for you to skip corrupted files.
Take NewHadoopRDD as an example, there are several lines of code in it to get a new piece of data:
Override def getNext (): (K, V) = {try {finished =! reader.next (key, value)} catch {case eof: EOFException = > finished = true} if (! finished) {inputMetrics.incRecordsRead (1)} (key, value)}
Getting the next record through reader, such as a corrupted gzip file, may throw an exception that is beyond the reach of the user catch and causes the Spark Streaming program to hang up.
In the HadoopRDD class, the corresponding implementation is as follows:
Override def getNext (): (K, V) = {try {finished =! reader.next (key, value)} catch {case eof: EOFException = > finished = true} if (! finished) {inputMetrics.incRecordsRead (1)} (key, value)}
At least there's an EOFException here. However, if it is a compressed file, the decompression will directly produce an error, generally speaking, it is IOException, not EOFException, and this time will stop.
Personally, I think some configurations should be added to allow users to choose how to deal with such corrupted or unzipped files.
Because we do not maintain a private version of Spark at this stage, we fix this problem by rewriting related classes such as FileInputDStream,NewHadoopRDD.
The thorn of Shuffle
Shuffle (especially in the case of a large amount of data in each cycle) is an inevitable pain in Spark Streaming, especially in the case of a large amount of data, because Spark Streaming has a limit on the processing time. We have a scenario where we have a five-minute cycle in which we just do a repartion and it takes 2.1minutes (including fetching data from the Kafka). At this stage, the Shuffle implementation of Spark needs to be dropped to disk, and the Shuffle Write and Shuffle Read phases are completely separate, and the latter must wait until the former is completed before starting work. I think it is necessary for Spark Streaming to develop a faster, completely memory-based Shuffle scheme separately.
The thorn of memory
In Spark Streaming, you will also encounter common problems in Spark, such as Executor Lost-related problems (shuffle fetch failure, Task failure retry, etc.). This means that there is a problem of insufficient memory or data skew. For now, you need to consider the following points in order to get a solution:
Under the same resources, increasing the number of partition can reduce memory problems. The reasons are as follows: by increasing the number of partition, there is less data to be processed for each task, much less for all running task to process at the same time, and less memory for all Executor. This can relieve the pressure of data skew and insufficient memory.
Pay attention to the number of parallelism of the shuffle read phase. Functions such as reduce,group actually have a second parameter, the degree of parallelism (partition number), but they are generally not set. However, if something goes wrong, it would be nice to set it up again.
Setting too many cores for an Executor means that at the same time, the memory pressure on that Executor will be greater and the GC will be more frequent. I usually keep it at about three. Then increase the number of Executor to keep the total amount of resources the same.
The thorn of monitoring
The Executors Tab on Spark Streaming's UI lacks a monitor, which is the Worker memory GC details. Although we can import this information into third-party monitoring, it is not as convenient as displaying it on Spark UI. For this reason, we also include this function in the research and development plan.
After reading the above, have you mastered how to analyze the advantages and disadvantages of Spark Streaming? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.