Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize dual-stream join by Flink

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article is about how Flink implements dual-stream join. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

When doing OLAP analysis on static tables in a database, two tables join is a very common operation. Similarly, in streaming jobs, it is sometimes necessary to join two streams to get more information. Flink DataStream API provides users with three operators to implement dual-stream join, which are:

Join ()

CoGroup ()

IntervalJoin ()

Prepare data

Access clickstream and order stream from Kafka, and convert them to POJO.

DataStream clickSourceStream = env .addSource (new FlinkKafkaConsumer011 ("ods_analytics_access_log", new SimpleStringSchema (), kafkaProps). SetStartFromLatest (); DataStream orderSourceStream = env .addSource (new FlinkKafkaConsumer011 ("ods_ms_order_done", new SimpleStringSchema (), kafkaProps). SetStartFromLatest ()); DataStream clickRecordStream = clickSourceStream .map (message-> JSON.parseObject (message, AnalyticsAccessLogRecord.class); DataStream orderRecordStream = orderSourceStream .map (message-> JSON.parseObject (message, OrderDoneLogRecord.class)); join ()

The semantics provided by the join () operator is "Window join", that is, inner join is performed according to the specified field and (scroll / slide / session) window, which supports two time characteristics: processing time and event time. The following example uses a 10-second scroll window to associate two ID of goods in circulation to get the price-related fields in the order stream.

ClickRecordStream .join (orderRecordStream) .where (record-> record.getMerchandiseId ()) .equalTo (record-> record.getMerchandiseId ()) .window (TumblingProcessingTimeWindows.of (Time.seconds (10)) .a pply (new JoinFunction () {@ Override public String join (AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {return StringUtils.join (Arrays.asList (accessRecord.getMerchandiseId (), orderRecord.getPrice (), orderRecord.getCouponMoney ()) OrderRecord.getRebateAmount (),'\ t') }) .print () .setParallelism (1)

Easy to use.

CoGroup ()

Inner join is certainly not enough, how to implement left/right outer join? The answer is to use the coGroup () operator. It is called in a way similar to the join () operator and requires windowing, but CoGroupFunction is more flexible than JoinFunction and can match and output data from left and / or right streams according to user-specified logic.

The following example implements the function of clickstream left join order stream, which is a very simple nested loop join idea (double loop).

ClickRecordStream .cogroup (orderRecordStream) .where (record-> record.getMerchandiseId ()) .equalTo (record-> record.getMerchandiseId ()) .window (TumblingProcessingTimeWindows.of (Time.seconds (10) .apply (new CoGroupFunction () {@ Override public void coGroup (Iterable accessRecords, Iterable orderRecords, Collector collector) throws Exception {for (AnalyticsAccessLogRecord accessRecord: accessRecords) {boolean isMatched = false) For (OrderDoneLogRecord orderRecord: orderRecords) {/ / there is a corresponding record collector.collect (new Tuple2 (accessRecord.getMerchandiseName (), orderRecord.getPrice ()); isMatched = true;} if (! isMatched) {/ / there is no corresponding record collector.collect (new Tuple2 (accessRecord.getMerchandiseName (), null)) in the right stream. ) .print () .setParallelism (1); intervalJoin ()

Both join () and coGroup () are associated based on windows. However, in some cases, the data pace of the two streams may not be the same. For example, the data of the order stream may not be written until long after the purchase action of the clickstream occurs, and if you circle it with a window, it is easy to miss the join. Therefore, Flink also provides the semantics of "Interval join", which is associated according to the specified field and the time interval of the offset of the right stream from the left stream, that is:

Right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

Interval join is also an inner join, although it does not need to open a window, but requires the user to specify the upper and lower bounds of the offset interval, and only supports the event time.

The sample code is as follows. Note that before running, you need to apply the assignTimestampsAndWatermarks () method to both streams to get the event timestamp and watermark.

ClickRecordStream .keyby (record-> record.getMerchandiseId ()) .intervalJoin (orderRecordStream.keyBy (record-> record.getMerchandiseId ()). Between (Time.seconds (- 30), Time.seconds (30)) .process (new ProcessJoinFunction () {@ Override public void processElement (AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector collector) throws Exception {collector.collect (StringUtils.join (Arrays.asList (accessRecord.getMerchandiseId (), orderRecord.getPrice (), orderRecord.getCouponMoney ()) OrderRecord.getRebateAmount (),'\ t')) }) .print () .setParallelism (1)

As you can see from the above, interval join, unlike window join, is an operation on top of two KeyedStream, and you need to call the between () method to specify the upper and lower bounds of the offset interval. If you want the upper and lower bounds to be open, you can call the upperBoundExclusive () / lowerBoundExclusive () method.

The realization principle of interval join

The following is the logic of the overloaded method called by the KeyedStream.process (ProcessJoinFunction) method.

Public SingleOutputStreamOperator process (ProcessJoinFunction processJoinFunction, TypeInformation outputType) {Preconditions.checkNotNull (processJoinFunction); Preconditions.checkNotNull (outputType); final ProcessJoinFunction cleanedUdf = left.getExecutionEnvironment () .clean (processJoinFunction) Final IntervalJoinOperator operator = new IntervalJoinOperator (lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType (). CreateSerializer (left.getExecutionConfig ()), right.getType (). CreateSerializer (right.getExecutionConfig ()), cleanedUdf) Return left .connect (right) .keyby (keySelector1, keySelector2) .transform ("Interval Join", outputType, operator);}

It can be seen that the connect () and keyBy () operations are performed on the two streams, and then the IntervalJoinOperator operator is used for conversion. In IntervalJoinOperator, two MapState are used to cache left-stream and right-stream data, respectively.

Private transient MapState leftBuffer;private transient MapState rightBuffer;@Overridepublic void initializeState (StateInitializationContext context) throws Exception {super.initializeState (context); this.leftBuffer = context.getKeyedStateStore (). GetMapState (new MapStateDescriptor (LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer (new BufferEntrySerializer (leftTypeSerializer); this.rightBuffer = context.getKeyedStateStore (). GetMapState (new MapStateDescriptor (RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer (new BufferEntrySerializer (rightTypeSerializer) }

Where Long represents the event timestamp, and List > indicates the data record of the arrival of the time. When data arrives in the left and right streams, the processElement1 () and processElement2 () methods are called, respectively, and both call the processElement () method as follows.

Overridepublic void processElement1 (StreamRecord record) throws Exception {processElement (record, leftBuffer, rightBuffer, lowerBound, upperBound, true);} @ Overridepublic void processElement2 (StreamRecord record) throws Exception {processElement (record, rightBuffer, leftBuffer,-upperBound,-lowerBound, false) } @ SuppressWarnings ("unchecked") private void processElement (final StreamRecord record, final MapState ourBuffer, final MapState otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception {final THIS ourValue = record.getValue (); final long ourTimestamp = record.getTimestamp () If (ourTimestamp = = Long.MIN_VALUE) {throw new FlinkException ("Long.MIN_VALUE timestamp: Elements used in" + "interval stream joins need to have timestamps meaningful timestamps.");} if (isLate (ourTimestamp)) {return;} addToBuffer (ourBuffer, ourValue, ourTimestamp); for (Map.Entry bucket: otherBuffer.entries ()) {final long timestamp = bucket.getKey (); if (timestamp)

< ourTimestamp + relativeLowerBound || timestamp >

OurTimestamp + relativeUpperBound) {continue;} for (BufferEntry entry: bucket.getValue ()) {if (isLeft) {collect ((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect ((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp) } long cleanupTime = (relativeUpperBound > 0L)? OurTimestamp + relativeUpperBound: ourTimest if (isLeft) {internalTimerService.registerEventTimeTimer (CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer (CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}

The idea of this code is:

Get the timestamp of the current stream StreamRecord, call the isLate () method to determine whether it is late data (that is, the timestamp is less than the current watermark value), and discard it if so.

Call the addToBuffer () method to insert the timestamp and data into the MapState corresponding to the current stream.

Traverses the MapState of another stream, and if the data meets the aforementioned time interval condition, the collect () method is called to deliver the data to the user-defined ProcessJoinFunction for processing. The code for the collect () method is as follows. Note that the timestamp corresponding to the result is the larger one in the left and right stream timestamps.

Private void collect (T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {final long resultTimestamp = Math.max (leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp (resultTimestamp); context.updateTimestamps (leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement (left, right, context, collector);}

Call TimerService.registerEventTimeTimer () to register a timer with a time stamp of timestamp + relativeUpperBound, which is responsible for cleaning up the state when the watermark exceeds the upper bound of the interval to prevent data accumulation. Note that the timer of the left and right flow belongs to a different namespace, and the logic is in the onEventTime () method.

@ Overridepublic void onEventTime (InternalTimer timer) throws Exception {long timerTimestamp = timer.getTimestamp (); String namespace = timer.getNamespace (); logger.trace ("onEventTime @ {}", timerTimestamp); switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report