In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Today, I will talk to you about the problems encountered in the integration of SparkStreaming and Kafka and what the solution is. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.
Preface
The recent work is to do log analysis platform, using sparkstreaming+kafka, using kafka mainly to take a fancy to its high performance for large amount of data processing, dealing with log applications couldn't be better, using sparkstreaming stream processing framework is mainly considering that it is based on the spark core, the future batch processing can be one-stop service, and can provide quasi-real-time services to elasticsearch, can achieve quasi-real-time positioning system logs.
Realize
There are two ways for Spark-Streaming to obtain kafka data-Receiver and Direct.
one。 Based on Receiver mode
This way you use Receiver to get the data. Receiver is implemented using Kafka's high-level Consumer API. The data that receiver gets from Kafka is stored in Spark Executor's memory, and then the job started by Spark Streaming will process that data. The code is as follows:
SparkConf sparkConf = new SparkConf (). SetAppName ("log-etl"). SetMaster ("local [4]"); JavaStreamingContext jssc = new JavaStreamingContext (sparkConf, new Duration (2000)); int numThreads = Integer.parseInt ("4"); Map topicMap = new HashMap (); topicMap.put ("group-45", numThreads) / / the received parameters are the JavaStreamingConetxt,zookeeper connection address, the topic JavaPairReceiverInputDStream messages of groupId,kafak = KafkaUtils.createStream (jssc, "172.16.206.27 topic JavaPairReceiverInputDStream messages 2181172.16.206.28) 2181172.16.206.29", "1", topicMap)
At the beginning, the system runs normally and no problems are found, but if the system restarts the sparkstreaming program abnormally, it is found that the program will repeatedly process the data that has been processed. This receiver-based approach uses Kafka's high-level API to save the consumed offset in ZooKeeper. This is the traditional way to consume Kafka data. This method combined with WAL mechanism can guarantee the high reliability of zero data loss, but there is no guarantee that the data will be processed once and only once, maybe twice. Because Spark and ZooKeeper may be out of sync. Officially, this integration method is no longer recommended. The relevant address of the official website is http://spark.apache.org/docs/latest/streaming-kafka-integration.html. Let's use the second method recommended by the official website, kafkaUtils's createDirectStream () method.
two。 Direct-based approach
This new direct approach, which is not based on Receiver, was introduced in Spark 1.3 to ensure a more robust mechanism. Instead of using Receiver to receive data, this approach periodically queries Kafka to get the offset of each topic+partition, thus defining the range of offset for each batch. When the job that processes the data starts, the simple consumer api of the Kafka is used to get the data in the offset range specified by the Kafka.
The code is as follows:
SparkConf sparkConf = new SparkConf (). SetAppName ("log-etl"); JavaStreamingContext jssc = new JavaStreamingContext (sparkConf, Durations.seconds (2)); HashSet topicsSet = new HashSet (Arrays.asList (topics.split (",")); HashMap kafkaParams = new HashMap (); kafkaParams.put ("metadata.broker.list", brokers) / / Create direct kafka stream with brokers and topics JavaPairInputDStream messages = KafkaUtils.createDirectStream (jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet)
The advantages of this direct approach are as follows:
1. Simplify parallel reading: if you want to read multiple partition, you don't need to create multiple input DStream and then union them. Spark creates as many RDD partition as Kafka partition and reads data from Kafka in parallel. So there is an one-to-one mapping between Kafka partition and RDD partition.
two。 One-time and only-once transaction mechanism: receiver-based communication between spark and zk is likely to lead to data inconsistencies.
3. High efficiency: in the case of receiver, if you want to ensure that the data is not lost, you need to enable the wal mechanism. In this way, two copies of the data are actually copied, one is in the copy of kafka itself, and the other is to be copied to wal. The copy is not required in direct mode.
three。 The problem of losing messages based on Direct
It seems that this method is very *, but it is still problematic. When the business needs to restart the sparkstreaming program, the business log will still enter into the kafka. When the job is restarted, messages can only be consumed from the * * offset, resulting in the loss of messages during the restart process. The offset in kafka is shown below (using kafkaManager to monitor messages in the queue in real time):
After stopping the acceptance of the business log, restart the spark program first, but it is found that job does not consume the data previously entered into the kafka. This is because the message is not saved without going through zk,topic 's offset.
four。 A solution to message loss
Generally, there are two ways to deal with this problem, you can first spark streaming save offset, use the spark checkpoint mechanism, the second is to save offset logic in the program, I prefer the second way, think that this way is controllable, all the initiative are in their own hands.
Take a look at the general flow chart first.
SparkConf sparkConf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("log-etl"); Set topicSet = new HashSet (); topicSet.add ("group-45"); kafkaParam.put ("metadata.broker.list", "172.16.206.17 setMaster 9092172.16.206.31); kafkaParam.put (" group.id "," simple1 ") / / transform java Map to scala immutable.map scala.collection.mutable.Map testMap = JavaConversions.mapAsScalaMap (kafkaParam); scala.collection.immutable.Map scalaKafkaParam = testMap.toMap (new Predef.$less$colon$less () {public Tuple2 apply (Tuple2 v1) {return v1;}}) / / init KafkaCluster kafkaCluster = new KafkaCluster (scalaKafkaParam); scala.collection.mutable.Set mutableTopics = JavaConversions.asScalaSet (topicSet); immutableTopics = mutableTopics.toSet (); scala.collection.immutable.Set topicAndPartitionSet2 = kafkaCluster.getPartitions (immutableTopics). Right (). Get (); / / offset data Map consumerOffsetsLong used for initialization of kafka direct stream = new HashMap () / / when offset is not saved (when the group*** is consumed), each partition offset defaults to 0 if (kafkaCluster.getConsumerOffsets (kafkaParam.get ("group.id"), topicAndPartitionSet2) .isLeft () {System.out.println (kafkaCluster.getConsumerOffsets ("group.id"), topicAndPartitionSet2). Left () .get (); Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet ((scala.collection.immutable.Set) topicAndPartitionSet2) For (TopicAndPartition topicAndPartition: topicAndPartitionSet1) {consumerOffsetsLong.put (topicAndPartition, 0L);}} / / offset already exists, using the saved offset else {scala.collection.immutable.Map consumerOffsetsTemp = kafkaCluster.getConsumerOffsets ("simple1", topicAndPartitionSet2) .right () .get () Map consumerOffsets = JavaConversions.mapAsJavaMap ((scala.collection.immutable.Map) consumerOffsetsTemp); Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet ((scala.collection.immutable.Set) topicAndPartitionSet2); for (TopicAndPartition topicAndPartition: topicAndPartitionSet1) {Long offset = (Long) consumerOffsets.get (topicAndPartition); consumerOffsetsLong.put (topicAndPartition, offset) }} JavaStreamingContext jssc = new JavaStreamingContext (sparkConf, new Duration (5000)); kafkaParamBroadcast = jssc.sparkContext () .broadcast (kafkaParam) / / create direct stream JavaInputDStream message = KafkaUtils.createDirectStream (jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParam, consumerOffsetsLong New Function () {public String call (MessageAndMetadata v1) throws Exception {System.out.println ("received data"=" + v1.message ()) Return v1.message ();}}); / / get the offset corresponding to each partition of rdd, and save it in offsetRanges final AtomicReference offsetRanges = new AtomicReference () JavaDStream javaDStream = message.transform (new Function () {public JavaRDD call (JavaRDD rdd) throws Exception {OffsetRange [] offsets = ((HasOffsetRanges) rdd.rdd ()) .offsetRanges (); offsetRanges.set (offsets); return rdd;}}) / / output javaDStream.foreachRDD (new Function () {public Void call (JavaRDD v1) throws Exception {if (v1.isEmpty ()) return null; List list = v1.collect (); for (String s:list) {System.out.println ("data = =" + s) } for (OffsetRange o: offsetRanges.get ()) {/ / Encapsulation topic.partition and offset correspondence java Map TopicAndPartition topicAndPartition = new TopicAndPartition (o.topic (), o.partition ()); Map topicAndPartitionObjectMap = new HashMap (); topicAndPartitionObjectMap.put (topicAndPartition, o.untilOffset ()) / / convert java map to scala immutable.map scala.collection.mutable.Map testMap = JavaConversions.mapAsScalaMap (topicAndPartitionObjectMap) Scala.collection.immutable.Map scalatopicAndPartitionObjectMap = testMap.toMap (new Predef.$less$colon$less () {public Tuple2 apply (Tuple2 v1) {return v1;}}) / / Update offset to kafkaCluster kafkaCluster.setConsumerOffsets (kafkaParamBroadcast.getValue (). Get ("group.id"), scalatopicAndPartitionObjectMap); System.out.println ("original data =" + o.topic () + "" + o.partition () + "+ o.fromOffset () +" + o.untilOffset () } return null;}}); jssc.start (); jssc.awaitTermination ();}
Basically, the problem of data loss can be solved in this way.
After reading the above, do you have any further understanding of the problems and solutions encountered in the integration of SparkStreaming and Kafka? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.