In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
SIDDHI is a powerful CEP engine with its own DSL, rich pattern matching functions and extensibility. Thank Chen Hao for providing the integration function of SIDDHI and FLINK https://github.com/haoch/flink-siddhi. This paper mainly introduces some implementation ideas of this ADDON.
Convert FLINK STREAM to SIDDHI STREAM definition
usage: SiddhiCEP.registerStream (streamName, FlinkDataStream, fieldNames)
obtains the type definition of the stream object through FlinkDataStream.getType. The registerStream method constructs a SiddhiStreamSchema object. According to the type definition of the stream object, the data type corresponding to each field is automatically stored in the internal fieldTypes array.
A Siddhi StreamDefinition object is created inside SiddhiStreamSchema, and the definition of attribute for StreamDefinition is added according to fieldNames + fieldTypes. SiddhiTypeFactory.getAttributeType is responsible for mapping the data type of Flink to the data type of Siddhi, and automatically generates a definition of Define Stream (see SiddhiStreamSchema.getStreamDefinitionExpression method) define stream [streamName] ([fieldName 1] [fieldType 1],. [fieldName n] [fieldType n]))
The SiddhiStreamSchema includes an StreamSerializer: the input (Object Array) for converting objects in DataStream into SiddhiStream:
if the stream object is a simple type Atomic Type puts the stream object directly into the ARRAY
if the stream object is of type Tuple, put the first N values in Tuple directly into ARRAY
if the stream object is of type Pojo or CaseClass, get the attributes corresponding to the Class directly according to each fieldName and put them in the ARRAY
Series FLINK STREAM and SIDDHI STREAM
SiddhiStream: abstract Stream base class
convertDataStream converts the original FLINK stream into a stream of Tuple type. The first element of Tuple is StreamId, and the second element is the data in the original stream, which supports normal Stream and KeyedStream.
ExecutionSiddhiStream: build SiddhiOperatorContext and call SiddhiStreamFactory.createDataStream to create a DataStream that integrates Siddhi. The type of DataStream is a subclass of Tuple. SiddhiTypeFactory.getTupleTypeInformation: the core idea is to obtain the definition of its Attribute by outputting the StreamDefinition of Stream by Siddhi, and then construct the definition of Flink Tuple type through TypeInfoParser.parse.
ExecutableStream creates ExecutionSiddhiStream objects based on Siddhi query
SingleSiddhiStream, UnionSiddhiStream: a subclass of ExecutableStream that supports chained calls to Fluent Style. UnionSiddhiStream called the DataStream.union method
SiddhiStreamFactory.createDataStream uses a custom StreamOperator: SiddhiStreamOperator. SiddhiStreamOperator through FLINK DataStream's transform method. Create SiddhiManager and SiddhiAppRuntime in the setup method of AbstractSiddhiOperator and register InputHandler and OutputCallback (StreamOutputHandler)
SiddhiStreamOperator.processElement needs to deal with two scenarios:
Flink TimeCharacteristic = ProcessingTime: first call StreamSerializer to convert the data to Object Array, and then directly call InputHandler.send to send the data to Siddhi for processing
Flink TimeCharacteristic = EventTime: cache the received StreamRecord to the internal priorityQueue until the Watermark is received, and send the StreamRecord less than watermark in the priorityQueue to the Siddhi for processing at one time
StreamOutputHandler: convert Siddhi Event to Flink StreamRecord according to the TypeInfo of Output. Then forward it to the Output of SiddhiStreamOperator
CHECKPOINT
Two kinds of State information are retained in SiddhiStreamOperator, one is the message saved in priorityQueue because watermark is not sent to Siddhi. The other is the State of Siddhi itself, which is obtained through SiddhiAppRuntime.snapshot ()
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.