Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of spark Stream de-duplication

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you the example analysis of spark streaming de-repetition, which is concise and easy to understand, which will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

Big data de-weight itself is very painful, for individual data de-weight is even more unreasonable, but spark Structured Streaming is very easy to achieve this function.

From the data collection to the end of the final processing, there is a situation that a piece of data will be repeatedly received and processed at a certain point. For example, kafka supports write semantics at least once, that is, when writing data to kafka, some records may be duplicated. For example, if a message has been received by broker and written to a file but there is no reply, the producer may repeat a message to kafka. Because of the write semantics of kafka at least once, structured streaming cannot avoid this type of data duplication. So once the write is successful, you can assume that the query output of structured Streaming is semantically written to kafka at least once. A feasible solution to remove duplicate records is to introduce a primary (unique) key into the data so that it can be de-duplicated when the data is read.

Structured streaming is that records in the data stream can be deduplicated using the unique identifier in the event. This is exactly the same as static deduplication using a unique identifier column. The query stores a certain amount of data from previous records so that duplicate records can be filtered. Similar to aggregation, you can use deduplication with or without watermark.

A) with watermark: if there is an upper limit on the time for duplicate records to arrive, you can define watermark on the event time column and use guid and event time columns for deduplication.

B) without watermark: since duplicate records may have no time limit, the query stores data from all past records as state.

Source code, has passed the test ~

Package bigdata.spark.StructuredStreaming.KafkaSourceOperator

Import org.apache.spark.SparkConf

Import org.apache.spark.sql.SparkSession

Import org.apache.spark.sql.functions.get_json_object

Import org.apache.spark.sql.streaming. {OutputMode, Trigger}

Object KafkaDropDuplicate {

Def main (args: Array [String]): Unit = {

Val sparkConf = new SparkConf () .setAppName (this.getClass.getName) .setMaster ("local [*]")

.set ("yarn.resourcemanager.hostname", "mt-mdh.local")

.set ("spark.executor.instances", "2")

.set ("spark.default.parallelism", "4")

.set ("spark.sql.shuffle.partitions", "4")

.setJars (List ("/ opt/sparkjar/bigdata.jar")

, "/ opt/jars/spark-streaming-kafka-0-102.11-2.3.1.jar"

, "/ opt/jars/kafka-clients-0.10.2.2.jar"

, "/ opt/jars/kafka_2.11-0.10.2.2.jar"

, "/ opt/jars/spark-sql-kafka-0-102.11-2.0.2.jar"))

Val spark = SparkSession

.builder

.appName ("StructuredKafkaWordCount")

.config (sparkConf)

.getOrCreate ()

Import spark.implicits._

Val df = spark

.readStream

.format ("kafka")

.option ("kafka.bootstrap.servers", "mt-mdh.local:9093")

.option ("subscribe", "jsontest")

.load ()

Val words = df.selectExpr ("CAST (value AS STRING)")

Val fruit = words.select (

Get_json_object ($"value", "$.time") .alias ("timestamp") .cast ("long")

, get_json_object ($"value", "$.clients") .alias ("fruit"))

Val fruitCast = fruit

.select (fruit ("timestamp")

.cast ("timestamp"), fruit ("fruit"))

.withWatermark ("timestamp", "10 Seconds")

.dropDuplicates ("fruit")

.groupBy ("fruit") .count ()

FruitCast.writeStream

.outputMode (OutputMode.Complete ())

.format ("console")

Trigger (Trigger.ProcessingTime (5000))

.option ("truncate", "false")

.start ()

.awaitTermination ()

}

} the above is the example analysis of spark streaming de-duplication. have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report