Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the comparison between Flink and Spark Streaming

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how to analyze the comparison between Flink and Spark Streaming. The content is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

Preface

Stream data (or data stream) refers to an infinite series of dynamic data sets in time distribution and quantity. The value of data decreases with the passage of time, so it is necessary to calculate and give a second-order response in real time. Streaming computing, as the name implies, is the processing of data flow, is real-time computing.

Architecture comparison

Ecology

Running model

Spark Streaming is a micro-batch processing. When you run job, you need to specify the time of batch processing and process one batch of data each time.

Flink is event-driven, and events can be understood as messages. An event-driven application is a state application that injects events from one or more streams and reacts to injected events by triggering calculations to update the state, or external actions.

Running role

The main roles of the Spark Streaming runtime (standalone mode) are:

Master: mainly responsible for the overall cluster resource management and application scheduling

Worker: responsible for resource management of single node, startup of driver and executor, etc.

Driver: the place where the user entrances the program execution, that is, the place where SparkContext is executed, mainly DAG generation, stage partition, task generation and scheduling.

Executor: responsible for implementing task, feedback implementation status and implementation results.

The main roles of the Flink runtime (standalone mode) are:

Jobmanager: coordinate distributed execution, they schedule tasks, coordinate checkpoints, coordinate fault recovery, and so on. There is at least one JobManager. Multiple JobManager can be launched under high availability, one of which is elected as leader and the rest as standby

Taskmanager: responsible for executing specific tasks, caching, switching data streams, with at least one TaskManager

Slot: each task slot represents a fixed part of the TaskManager resource, and the number of Slot represents the number of task that the taskmanager can execute in parallel.

Comparison of programming models

The comparison of programming models is mainly to compare the differences in coding between flink and Spark Streaming.

Spark Streaming

The combination of Spark Streaming and kafka is mainly based on two models:

Based on receiver dstream

Based on direct dstream.

The programming mechanisms of the above two models are similar, but there are some differences between api and internal data acquisition. The new version has eliminated the receiver-based model, and the direct Dstream-based model is usually used in enterprises.

Val Array (brokers, topics) = args// create a batch time of 2s context val sparkConf = new SparkConf (). SetAppName ("DirectKafkaWordCount") val ssc = new StreamingContext (sparkConf, Seconds (2)) / / create DirectStream val topicsSet = topics.split (",") using broker and topic. ToSet val kafkaParams = Map [String, String] ("metadata.broker.list"-> brokers) val messages = KafkaUtils.createDirectStream [String, String] (ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe [String String] (topicsSet, kafkaParams) / / Get the lines, split them into words, count the words and print val lines = messages.map (_ .value) val words = lines.flatMap (_ .split (")) val wordCounts = words.map (x = > (x, 1L)). ReduceByKey (_ + _) wordCounts.print () / / launch stream ssc.start () ssc.awaitTermination ()

Through the above code, we can get to:

Set batch time

Create data flow

Write transform

Write action

Initiate execution

Flink

Let's take a look at how code is written in combination with flink and kafka. The combination of Flink and kafka is event-driven, and you may have doubts about this. When consuming kafka data, you can call poll to obtain data in batches (you can set the batch size and timeout), which cannot be called event trigger. In fact, the data from poll is sorted out internally by flink, and then emit one by one, forming an event trigger mechanism. The following code is that flink integrates kafka as data source and data sink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.getConfig (). DisableSysoutLogging (); env.getConfig (). SetRestartStrategy (RestartStrategies.fixedDelayRestart (4, 10000)); env.enableCheckpointing (5000); / / create a checkpoint every 5 seconds env.getConfig (). SetGlobalJobParameters (parameterTool); / / make parameters available in the web interface env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); / / ExecutionConfig.GlobalJobParameters env.getConfig (). SetGlobalJobParameters (null) DataStream input = env .addSource (new FlinkKafkaConsumer010 (parameterTool.getRequired ("input-topic"), new KafkaEventSchema (), parameterTool.getProperties ()) .assignTimestampsAndWatermarks (new CustomWatermarkExtractor ()) .setParallelism (1). Rebalance () .keyby ("word") .map (new RollingAdditionMapper ()) .setParallelism (0); input.addSink (new FlinkKafkaProducer010 (parameterTool.getRequired ("output-topic"), new KafkaEventSchema (), parameterTool.getProperties ()); env.execute ("Kafka 0.10 Example"))

From the code that combines Flink and kafka, you can get to:

Registration data source

Write running logic

Registration data sink

Calling env.execute takes less batch time to set up than Spark Streaming, and another significant difference is that all operators of flink are in the form of lazy, and calling env.execute builds jobgraph. The client side is responsible for generating Jobgraph and submitting it to the cluster to run, while the operators of Spark Streaming are action and transform, in which only transform is in the form of lazy, and DAG generation, stage partition and task scheduling are carried out on the driver side, and driver runs on the client in client mode.

Task scheduling principle

Spark task scheduling

The Spark Streaming task, as mentioned above, is based on micro-batch processing, and each batch is actually a Spark Core task. For the coded Spark Core task, it mainly includes the following parts from generation to final execution:

Build DAG diagrams

Partition stage

Generate taskset

Dispatch task.

For details, please refer to figure 5:

There are two modes for scheduling and execution of job: fifo and fair, and Task is scheduled and executed according to the locality of data. Suppose the kafka topic consumed by each Spark Streaming task has four partitions, with a transform operation (such as map) and a reduce operation in the middle, as shown in figure 6:

Suppose there are two executor, of which each executor has three cores, is the corresponding task running location for each batch fixed? Can you predict it? Due to the data locality and scheduling uncertainty, the task running location generated by each batch corresponding to the kafka partition is not fixed.

Flink task scheduling

The flow task client for flink will first generate StreamGraph, then generate JobGraph, then submit the jobGraph to Jobmanager to complete the transformation from jobGraph to ExecutionGraph, and finally be scheduled and executed by jobManager.

As shown in figure 7, there is a program composed of data source, MapFunction, and ReduceFunction. The concurrency of data source and MapFunction is 4, while that of ReduceFunction is 3. A data stream consists of the order of Source-Map-Reduce, running on a cluster with 2 TaskManager and 3 Task Slot for each TaskManager.

It can be seen that after the topology generation submission of flink is executed, unless there is a fault, the execution position of the topology component remains unchanged, and the degree of parallelism is determined by the parallelism of each operator, similar to storm. Spark Streaming is that each batch is scheduled according to the locality of data and resources, and there is no fixed execution topology. Flink is the flow and execution of data in the topology, while Spark Streaming is the parallel processing of data cache batches.

Comparison of time mechanism

Time for stream processing

The flow processor has a total of three concepts of time in terms of time:

Processing time

The processing time refers to the system time of each machine, and the machine time that runs each operator instance will be used when the stream program uses the processing time. Processing time is the simplest concept of time, which does not require coordination between the flow and the machine. It provides the best performance and lowest latency. However, in distributed and asynchronous environments, processing time can not provide the timing guarantee of message events, because it is restricted by message transmission delay, the speed of message flow between operators and so on.

Event time

The event time is the time that the event occurs on its device, which is embedded before the event enters the flink, and then the flink can extract that time. The flow program based on event time can ensure the sequence of event processing, but the application based on event time must be combined with watermark mechanism. Event time-based processing often has a certain lag, because it needs to wait for follow-up events and deal with out-of-order events, which should be carefully considered when using time-sensitive applications.

Injection time

The injection time is the time that the event is injected into the flink. The event gets the current time of the source at the source operator as the event injection time, which is used by subsequent time-based processors to process the data.

Compared to the event time, the injection time cannot handle unordered or lagged events, but the application disorder specifies how the watermark is generated. The processing of internally injected time programs is similar to event time, but timestamp allocation and watermark generation are automatic.

Figure 8 can clearly see the difference between the three kinds of time:

Spark time mechanism

Spark Streaming only supports processing time, Structured streaming supports processing time and event time, and supports watermark mechanism to deal with lagging data.

Flink time mechanism

Flink supports three time mechanisms: event time, injection time, processing time, and watermark mechanism to deal with lag data.

Kafka dynamic partition detection

Spark Streaming

For enterprises with real-time business needs, the amount of data will increase synchronously with business growth, which will cause the original number of kafka partitions not to meet the concurrency required for data writing, and it is necessary to expand the partition of kafka or increase the topic of kafka. Real-time processing programs, such as SparkStreaming and flink, are required to detect the new topic of kafka, partition and consume the new partition data.

Next, combined with the source code analysis, Spark Streaming and flink can dynamically find the new partition and consume the data to deal with the new partition when kafka adds topic or partition. There are two different versions of Spark Streaming and kafka, as shown in figure 9 is the comparative data given on the official website:

It is confirmed that the combination of Spark Streaming and version 0.8 of kafka does not support dynamic partition detection, combined with version 0.10, and then through source code analysis.

Combination of Spark Streaming and kafka version 0.8

* Source code analysis is only for partition detection

The entry is the compute of DirectKafkaInputDStream:

Override def compute (validTime: Time): Option [KafkaRDD [K, V, U, T, R]] = {/ / the line change will calculate this job, and the maximum offset val untilOffsets of each kafka partition to be consumed is val untilOffsets = clamp (latestLeaderOffsets (maxRetries)) / / build KafkaRDD With the specified number of partitions and the offset range to be consumed val rdd = KafkaRDD [K, V, U, T, R] (context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) / / Report the record number and metadata of this batch interval to InputInfoTracker. Val offsetRanges = currentOffsets.map {case (tp, fo) = > val uo = untilOffsets (tp) OffsetRange (tp.topic, tp.partition, fo, uo.offset)} val description = offsetRanges.filter {offsetRange = > / / Don't display empty ranges. OffsetRange.fromOffset! = offsetRange.untilOffset} .map {offsetRange = > s "topic: ${offsetRange.topic}\ tpartition: ${offsetRange.partition}\ t" + s "offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"} .mkString ("\ n") / / Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map ("offsets"-> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION-> description) val inputInfo = StreamInputInfo (id, rdd.count) Metadata) ssc.scheduler.inputInfoTracker.reportInfo (validTime, inputInfo) currentOffsets = untilOffsets.map (kv = > kv._1-> kv._2.offset) Some (rdd)}

The first line is to calculate the maximum offset to be consumed by each partition of the batch generated KafkaRDD. Then take a look at latestLeaderOffsets (maxRetries)

@ tailrec protected final def latestLeaderOffsets (retries: Int): Map [TopicAndPartition, LeaderOffset] = {/ / you can see whether it is used to specify the list of maximum offset partitions or just currentOffsets, and nothing about the new partition is found. Val o = kc.getLatestLeaderOffsets (currentOffsets.keySet) / / Either.fold would confuse @ tailrec, do it manually if (o.isLeft) {val err = o.left.get.toString if (retries kv._1-> kv._2.offset)

There is no code to detect the new topic or partition of kafka, so you can confirm that the combination of Spark Streaming and kafka 0.8 does not support dynamic partition detection.

Spark Streaming combines with kafka version 0.10

The entry is also the compute method of DirectKafkaInputDStream. Pick up the main part and say that the first line in Compute is also the maximum offset for each partition to be consumed by the current job to generate kafkardd:

/ / get the maximum consumption offset of the KafkaRDD to be used for each partition val untilOffsets = clamp (latestOffsets ()) for the current generated job

The code for detecting the new topic or partition of kafka is found in latestOffsets ()

/ * Returns the latest (highest) available offsets, taking new partitions into account. * / protected def latestOffsets (): Map [TopicPartition, Long] = {val c = consumer paranoidPoll (c) / / get all partition information val parts = c.assignment (). AsScala / / make sure new partitions are reflected in currentOffsets / / get the new partition information val newPartitions = parts.diff (currentOffsets.keySet) / / position for new partitions determined by auto.offset.reset if no commit / / the new partition consumption location It is up to auto.offset.reset to determine currentOffsets = currentOffsets + + newPartitions.map (tp = > tp-> c.position (tp)). ToMap / / don't want to consume messages, so pause c.pause (newPartitions.asJava) / / find latest available offsets c.seekToEnd (currentOffsets.keySet.asJava) parts.map (tp = > tp-> c.position (tp)). ToMap}

There is a process in this method to get the new partition of kafka and update it to currentOffsets, so it can be verified that Spark Streaming and kafka version 0.10 support dynamic partition detection.

Flink

The entry class is FlinkKafkaConsumerBase, which is the parent class of all flink kafka consumers.

In the run method of FlinkKafkaConsumerBase, the kafkaFetcher is created, which is actually the consumer:

This.kafkaFetcher = createFetcher (sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext (), offsetCommitMode, getRuntimeContext (). GetMetricGroup (). AddGroup (KAFKA_CONSUMER_METRICS_GROUP), useMetrics)

Then a thread is created that periodically detects the new partition of the kafka and then adds it to the kafkaFetcher.

If (discoveryIntervalMillis! = PARTITION_DISCOVERY_DISABLED) {final AtomicReference discoveryLoopErrorRef = new AtomicReference (); this.discoveryLoopThread = new Thread (new Runnable () {@ Override public void run () {try {/ /-partition discovery loop-List discoveredPartitions / / throughout the loop, we always eagerly check if we are still running before / / performing the next operation, so that we can escape the loop as soon as possible while (running) {if (LOG.isDebugEnabled ()) {LOG.debug ("Consumer subtask {} is trying to discover new partitions...", getRuntimeContext (). GetIndexOfThisSubtask ());} try {discoveredPartitions = partitionDiscoverer.discoverPartitions ();} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {/ / the partition discoverer may have been closed or woken up before or during the discovery / / this would only happen if the consumer was canceled; simply escape the loop break;} / / no need to add the discovered partitions if we were closed during the meantime if (running & &! discoveredPartitions.isEmpty ()) {kafkaFetcher.addDiscoveredPartitions (discoveredPartitions);} / do not waste any time sleeping if we're not running anymore if (running & & discoveryIntervalMillis! = 0) {try {Thread.sleep (discoveryIntervalMillis);} catch (InterruptedException iex) {/ / may be interrupted if the consumer was canceled midway; simply escape the loop break } catch (Exception e) {discoveryLoopErrorRef.set (e);} finally {/ / calling cancel will also let the fetcher loop escape / / (if not running, cancel () was already called) if (running) {cancel ();}, "Kafka Partition Discovery for" + getRuntimeContext (). GetTaskNameWithSubtasks (); discoveryLoopThread.start (); kafkaFetcher.runFetchLoop ()

Above is the process of flink dynamically discovering new partitions in kafka. However, unlike Spark, which does not require any configuration, flink dynamically discovers that new partitions are added to kafka, which needs to be enabled. It's also very simple, you need to set the flink.partition-discovery.interval-millis property to greater than 0.

Fault-tolerant Mechanism and processing semantics

The main purpose of this section is to compare the two in fault recovery and how to ensure only one-time processing semantics. This is a good time to throw out a question: when processing in real time, how to ensure that the data is processed semantically only once?

Spark Streaming guarantees only one processing.

For the Spark Streaming task, we can set checkpoint, and then if there is a failure and restart, we can recover from the last checkpoint, but this behavior can only keep the data from being lost and may be processed repeatedly, not the semantics at once.

For direct Stream with the combination of Spark Streaming and kafka, you can maintain the offset to zookeeper, kafka or any other external system, and then submit the offset after each result submission, so that the failure recovery restart can take advantage of the last submitted offset recovery to ensure that the data is not lost. However, if the failure occurs after the result is submitted and before the offset is submitted, the data will be processed multiple times. At this time, we need to ensure that the multiple output of the processing result will not affect the normal business.

As a result, it can be analyzed that, assuming that the data is to be processed semantically exactly at once, then the result output and offset commit must be done in one transaction. There are two ways to do this:

The action output from repartition (1) Spark Streaming becomes a single partition, which can be done with transactions:

Dstream.foreachRDD (rdd= > {rdd.repartition (1) .foreachPartition (partition= > {/ / Open transaction partition.foreach (each= > {/ / commit data}) / / commit transaction}))

Submit the result with offset

That is, the resulting data contains offset. In this way, submitting the result and submitting the offset is a complete operation, and there is no data loss or repeated processing. The offset with the result of the last submission can be used when the fault is recovered.

Flink and kafka 0.11 guarantee only one processing.

For sink to support semantics only once, data must be written to Kafka in a transactional manner, so that all writes between the two checkpoint are committed as one transaction when the transaction is committed. This ensures that these writes can be rolled back in the event of a failure or crash.

In a distributed application with multiple concurrent execution sink, it is not enough to perform a single commit or rollback, because all components must agree on these commits or rollbacks in order to ensure consistent results. Flink uses the two-phase commit protocol and the pre-commit (pre-commit) phase to solve this problem.

The Flink application in this example is shown in figure 11 with the following components:

A source that reads data from Kafka (that is, KafkaConsumer)

A time-windowed party operation

A sink that writes the result back to Kafka (that is, KafkaProducer)

The following is a detailed explanation of the two submission ideas of flink:

As shown in figure 12, Flink checkpointing enters the pre-commit phase at the beginning. Specifically, once the checkpoint starts, the JobManager of Flink writes a checkpoint barrier to the input stream, dividing all messages in the stream into messages belonging to this checkpoint and the next checkpoint, and barrier will also flow between operators. For each operator, the barrier triggers the operator status backend to take a snapshot of the operator status. Data source saves the offset of the Kafka and then passes the checkpoint barrier to the subsequent operator.

This approach only applies to operator with only its internal state. The internal state refers to the content saved and managed by Flink state backends (such as the sum aggregated by window in the second operator).

When a process has only its internal state, no action is required during the pre-commit phase except that data changes need to be written to state backend before checkpoint. When the checkpoint succeeds, Flink commits the writes correctly and terminates the commit if the checkpoint fails, as shown in figure 13.

When combined with an external system, the external system must support transactions that can be bundled with the two-phase commit protocol. Obviously, the sink in this example introduces kafka sink, so data sink must pre-commit external transactions during the pre-commit phase. As shown below:

When the barrier is passed through all operators and the triggered snapshot write is complete, the pre-commit phase is complete. All trigger state snapshots are treated as part of the checkpoint, or checkpoint is a status snapshot of the entire application, including pre-committed external states. If there is a failure, you can recover from checkpoint. The next step is to notify all operators that checkpoint is successful. At this stage, jobmanager initiates callback logic that has been completed by checkpoint for each operator.

In this example, the data source and window operations have no external state, so at this stage, the two operators do not need to perform any logic, but data sink has an external state, so we must commit the external transaction at this time, as shown in the following figure:

These are the basic logic of the flink implementation to process exactly once.

Back pressure

The speed of consumer consumption is lower than that of producer production. In order to make the application normal, consumers will feedback to producers to adjust the speed of producer production, so that how much consumers need and how much producers produce.

* after back pressure, it is called back pressure.

Back pressure of Spark Streaming

There is a back pressure mechanism in the combination of Spark Streaming and kafka, and the goal is to adjust the number of kafka messages in subsequent batches according to the current processing of job. In order to achieve this purpose, Spark Streaming adds a RateController to the original architecture, using the algorithm PID, and the feedback data needed are the end time of task processing, scheduling time, processing time, and the number of messages. These data are obtained through the SparkListener system, and then a rate is calculated by PIDRateEsimator's compute, and then an offset can be calculated. Then the maximum offset of a message to be consumed is obtained by comparing with the maximum number of consumption messages set by the speed limit.

The compute method of PIDRateEsimator is as follows:

Def compute (time: Long, / / in milliseconds numElements: Long, processingDelay: Long, / / in milliseconds schedulingDelay: Long / / in milliseconds): Option [Double] = {logTrace (s "\ ntime = $time, # records = $numElements," + s "processing time = $processingDelay Scheduling delay = $schedulingDelay ") this.synchronized {if (time > latestTime & & numElements > 0 & & processingDelay > 0) {val delaySinceUpdate = (time-latestTime). ToDouble / 1000 val processingRate = numElements.toDouble / processingDelay * 1000 val error = latestRate-processingRateval historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis / / in elements/ (second ^ 2) val dError = (error-latestError) / delaySinceUpdateval newRate = (latestRate-proportional * error-integral * historicalError-derivative * dError) .max (minRate) logTrace (s"| logTrace = $latestRate) Error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError "" .stripMargin) latestTime = time if (firstRun) {latestRate = processingRate latestError = 0D firstRun = false logTrace ("First run, rate estimation skipped") None} else {latestRate = newRate latestError = error logTrace (s "New rate = $newRate") Some (newRate)} else {logTrace ("Rate estimation skipped") None}

Back pressure of Flink

Unlike Spark Streaming's back pressure, Flink back pressure is that jobmanager triggers 100 Thread.getStackTrace () calls per task per 50ms to calculate the percentage of blocking. The process is shown in figure 16:

The blocking ratio is divided into three levels on web:

OK: 0

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report