In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Spark provides two ways to read kafka data streams: createDstream and createDirectStream. The differences between the two are as follows: 1. KafkaUtils.createDstream
The constructor is KafkaUtils.createDstream (ssc, [zk], [consumer group id], [per-topic,partitions])
Receivers is used to receive data, using Kafka's high-level consumer api. All data received by receivers will be saved in Spark executors, and then job will be started through Spark Streaming to process the data, which will be lost by default. WAL log can be enabled, which is stored on HDFS.
A. Create a receiver to pull data from kafka regularly. The rdd partition of ssc and the topic partition of kafka are not the same concept, so if increasing the number of specific principal partitions only increases the number of threads consuming topic in a receiver, it does not increase the number of parallel processing data in spark.
B. For different group and topic, you can use multiple receivers to create different DStream
C. If WAL is enabled, you need to set the storage level, that is, KafkaUtils.createStream (… ., StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
Distinguishes Receiver from receiving data, which periodically queries kafka's topic+partition for the latest offset, and then processes the data in each batch according to the offset range, using kafka's simple consumer api
Advantages:
A, simplify parallelism, do not need multiple kafka input streams, this method will create the same number of rdd as kafka partitions, and will read from kafka in parallel.
B, efficient, this method does not require WAL,WAL mode to copy the data twice, the first is copied by kafka, and the other is written to wal
C, just one-time semantics (Exactly-once-semantics), the traditional way to read kafka data is to write the offset into zookeeper through kafka high-level api. The possibility of data loss is that the offset in zookeeper is inconsistent with that of ssc. EOS implements kafka low-level api, and the offset is only stored in checkpoint by ssc, which eliminates the problem of inconsistency between zk and ssc offsets. The disadvantage is that the kafka monitoring tool based on zookeeper cannot be used.
Public void adclick () {
SparkConf conf = new SparkConf ()
.setAppName ("")
.setMaster ("")
JavaStreamingContext jssc = new JavaStreamingContext (conf,Durations.seconds (10))
Jssc.checkpoint ("")
Map kafkaParams = new HashMap ()
KafkaParams.put ("metadata.broker.list", ConfigurationManager.getProperty ("metadata.broker.list"))
String kafkaTopics = ConfigurationManager.getProperty ("kafkaTopics")
String [] kafkaTopicsSplits = kafkaTopics.split (",")
Set tops = new HashSet ()
For (String xx:kafkaTopicsSplits) {
Tops.add (xx)
}
JavaPairInputDStream adRealTimeDStream = KafkaUtils.
CreateDirectStream (
Jssc
String.class
String.class
StringDecoder.class
StringDecoder.class
KafkaParams
Tops)
Jssc.start ()
Jssc.awaitTermination ()
Jssc.close ()
}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.