Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

FLINK SIDDHI ADDON study notes

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

SIDDHI is a powerful CEP engine with its own DSL, rich pattern matching functions and extensibility. Thank Chen Hao for providing the integration function of SIDDHI and FLINK https://github.com/haoch/flink-siddhi. This paper mainly introduces some implementation ideas of this ADDON.

Convert FLINK STREAM to SIDDHI STREAM definition

   usage: SiddhiCEP.registerStream (streamName, FlinkDataStream, fieldNames)

   obtains the type definition of the stream object through FlinkDataStream.getType. The registerStream method constructs a SiddhiStreamSchema object. According to the type definition of the stream object, the data type corresponding to each field is automatically stored in the internal fieldTypes array.

A Siddhi StreamDefinition object is created inside    SiddhiStreamSchema, and the definition of attribute for StreamDefinition is added according to fieldNames + fieldTypes. SiddhiTypeFactory.getAttributeType is responsible for mapping the data type of Flink to the data type of Siddhi, and automatically generates a definition of Define Stream (see SiddhiStreamSchema.getStreamDefinitionExpression method) define stream [streamName] ([fieldName 1] [fieldType 1],. [fieldName n] [fieldType n]))

The    SiddhiStreamSchema includes an StreamSerializer: the input (Object Array) for converting objects in DataStream into SiddhiStream:

     if the stream object is a simple type Atomic Type puts the stream object directly into the ARRAY

     if the stream object is of type Tuple, put the first N values in Tuple directly into ARRAY

     if the stream object is of type Pojo or CaseClass, get the attributes corresponding to the Class directly according to each fieldName and put them in the ARRAY

Series FLINK STREAM and SIDDHI STREAM

   SiddhiStream: abstract Stream base class

   convertDataStream converts the original FLINK stream into a stream of Tuple type. The first element of Tuple is StreamId, and the second element is the data in the original stream, which supports normal Stream and KeyedStream.

   ExecutionSiddhiStream: build SiddhiOperatorContext and call SiddhiStreamFactory.createDataStream to create a DataStream that integrates Siddhi. The type of DataStream is a subclass of Tuple. SiddhiTypeFactory.getTupleTypeInformation: the core idea is to obtain the definition of its Attribute by outputting the StreamDefinition of Stream by Siddhi, and then construct the definition of Flink Tuple type through TypeInfoParser.parse.

   ExecutableStream creates ExecutionSiddhiStream objects based on Siddhi query

   SingleSiddhiStream, UnionSiddhiStream: a subclass of ExecutableStream that supports chained calls to Fluent Style. UnionSiddhiStream called the DataStream.union method

   SiddhiStreamFactory.createDataStream uses a custom StreamOperator: SiddhiStreamOperator. SiddhiStreamOperator through FLINK DataStream's transform method. Create SiddhiManager and SiddhiAppRuntime in the setup method of AbstractSiddhiOperator and register InputHandler and OutputCallback (StreamOutputHandler)

   SiddhiStreamOperator.processElement needs to deal with two scenarios:

     Flink TimeCharacteristic = ProcessingTime: first call StreamSerializer to convert the data to Object Array, and then directly call InputHandler.send to send the data to Siddhi for processing

     Flink TimeCharacteristic = EventTime: cache the received StreamRecord to the internal priorityQueue until the Watermark is received, and send the StreamRecord less than watermark in the priorityQueue to the Siddhi for processing at one time

   StreamOutputHandler: convert Siddhi Event to Flink StreamRecord according to the TypeInfo of Output. Then forward it to the Output of SiddhiStreamOperator

CHECKPOINT

Two kinds of State information are retained in    SiddhiStreamOperator, one is the message saved in priorityQueue because watermark is not sent to Siddhi. The other is the State of Siddhi itself, which is obtained through SiddhiAppRuntime.snapshot ()

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report