In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article is to share with you about how to use Apache to query Pulsar streams, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Here we will introduce the integration of Apache Pulsar and Apache Flink and the latest research and development progress, and explain in detail how to use Pulsar built-in schema to query Pulsar streams in real time using Apache Flink.
Introduction to Apache Pulsar
Apache Pulsar is a flexible publish / subscribe messaging system that supports persistent log storage. The architectural advantages of Pulsar include multi-tenancy, unified messaging model, structured event flow, cloud native architecture, etc., which make Pulsar perfectly suitable for a variety of user scenarios, from billing, payment, transaction services to the integration of different messaging architectures in the organization.
Existing Pulsar & Flink integration
(Apache Flink 1.6 +)
In the existing Pulsar and Flink integration, Pulsar is used as a message queue in Flink applications. Flink developers can select a specific Pulsar source and connect to the desired Puslar cluster and topic, using Pulsar as the stream source and stream sink for Flink:
/ / create and configure Pulsar consumerPulsarSourceBuilderbuilder = PulsarSourceBuilder .builder (new SimpleStringSchema ()) .serviceUrl (serviceUrl) .topic (inputTopic) .subsciptionName (subscription); SourceFunction src = builder.build (); / / ingest DataStream with Pulsar consumerDataStream words = env.addSource (src)
The Pulsar stream can then connect to the processing logic of the Flink.
/ / perform computation on DataStream (here a simple WordCount) DataStream wc = words .flatmap ((FlatMapFunction) (word, collector)-> {collector.collect (new WordWithCount (word, 1));})
.returns (WordWithCount.class) .keyby ("word") .timeWindow (Time.seconds (5)) .reduce ((ReduceFunction) (C1, c2)-> new WordWithCount (c1.word, c1.count + c2.count))
The data is then written out to Pulsar through sink.
/ / emit result via Pulsar producer wc.addSink (new FlinkPulsarProducer (serviceUrl, outputTopic, new AuthentificationDisabled (), wordWithCount-> wordWithCount.toString (). GetBytes (UTF_8), wordWithCount-> wordWithCount.word))
This is an important first step for integration, but the existing design is not enough to take full advantage of the full capabilities of Pulsar.
There are some shortcomings in the integration of Pulsar with Flink 1.6.0, including neither being used as persistent storage nor schema integration with Flink, resulting in manual input when adding a description for the application schema registration.
Integration of Pulsar and Flink 1.9
Use Pulsar as Flink catalog
The latest integration of Flink 1.9.0 with Pulsar solves the problems mentioned earlier. Alibaba Blink's contribution to the Flink repository not only strengthens the processing architecture, but also adds new features to make the integration of Flink and Pulsar more powerful and effective.
Flink 1.9.0:
Https://flink.apache.org/downloads.html#apache-flink-191
Pulsar schema integration is introduced into the implementation of the new connector, which increases the support for Table API and provides Pulsar reading of exactly-once semantics and Pulsar writing of at-least-once semantics.
And, with schema integration, Pulsar can be registered as Flink catalog, and Flink queries can be run on the Pulsar stream with just a few commands. Below we will describe the new integration in detail and give an example of how to query Pulsar streams using Flink SQL.
Using Flink Pulsar Schema integration
Before we expand on the integration details and specific usage, let's take a look at how Pulsar schema works.
Apache Pulsar has built-in support for Schema, eliminating the need for additional management of schema. Pulsar's data schema is associated with each topic, so both producer and consumer can send data using predefined schema information, while broker can validate schema and manage schema multiversioning and schema evolution in compatibility checks.
The following are examples of Pulsar schema for producer and consumer, respectively. On the producer side, you can specify to use schema, and Pulsar can send a POJO class without serialization / deserialization.
Similarly, on the consumer side, the data schema can also be specified, and immediately after receiving the data, Pulsar automatically validates the schema information, gets a given version of schema, and then deserializes the data into the POJO structure. Pulsar stores schema information in the metadata of topic.
/ / Create producer with Struct schema and send messagesProducer producer = client.newProducer (Schema.AVRO (User.class)) .create (); producer.newMessage () .value (User.builder () .username ("pulsar-user") .UserID (1L) .build () .send (); / / Create consumer with Struct schema and receive messagesConsumer consumer = client.newCOnsumer (Schema.AVRO (User.class)) .create (); consumer.receive ()
Suppose an application specifies schema for producer and / or consumer. When schema information is received, the producer (or consumer) connected to the broker transmits such information so that broker registers the schema, verifies the schema, and checks schema compatibility before returning or rejecting the schema, as shown in the following figure:
Pulsar can not only process and store schema information, but also handle schema evolution (schema evolution) if necessary. Pulsar can effectively manage schema evolution in broker and track all versions of schema in necessary compatibility checks.
In addition, when the message is published on the producer side, Pulsar marks the schema version in the metadata of the message; when the consumer receives the message and completes the deserialization of the metadata, Pulsar will check the schema version associated with the message and obtain the schema information from the broker.
Therefore, when Pulsar integrates with Flink applications, Pulsar uses pre-existing schema information and maps a single message with schema information to different peers in the Flink type system.
When Flink users do not interact directly with schema or do not use the original schema (primitive schema) (for example, using topic to store strings or long values), Pulsar converts messages to Flink lines, or "values"; or in structured schema types (for example, JSON and AVRO), Pulsar extracts individual field information from schema information and maps fields to Flink's type system.
Finally, all metadata information related to the message (for example, message key, topic, release time, event time, and so on) is converted to the metadata field in the Flink row. The following are two examples of using raw schema and structured schema to explain how to convert data from Pulsar topic to Flink type system.
Original schema (Primitive Schema):
Root |-- value: DOUBLE |-- _ _ key: BYTES |-- _ _ topic: STRING |-- _ _ messageId: BYTES |-- _ _ publishTime: TIMESTAMP (3) |-_ _ eventTime: TIMESTAMP (3)
Structured schema (Avor Schema):
@ Data@AllArgsConstructor@NoArgsConstructorpublic static class Foo {public int i; public float f; public Bar bar;} @ Data@AllArgsConstructor@NoArgsConstructorpublic static class Bar {public boolean b; public String s;} Schema s = Schema.AVRO (Foo.getClass ()) Root |-- I: INT |-- f: FLOAT |-- bar: ROW |-- _ _ key: BYTES |-- _ _ topic: STRING |-- _ _ messageId: BYTES |-- _ publishTime: TIMESTAMP (3) |-_ _ eventTime: TIMESTAMP (3)
When all schema information is mapped to the Flink type system, you can build a Pulsar source, sink, or catalog in Flink based on the specified schema information, as follows:
Flink & Pulsar: reading data from Pulsar
1. Create a Pulsar source for streaming queries
Val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties () props.setProperty ("service.url", "pulsar://...") props.setProperty ("admin.url", "http://...")props.setProperty("partitionDiscoveryIntervalMillis"," 5000 ") props.setProperty (" startingOffsets "," earliest ") props.setProperty (" topic ") "test-source-topic") val source = new FlinkPulsarSource (props) / / you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryableval dataStream = env.addSource (source) (null)
/ / chain operations on dataStream of Row and sink the output// end method chaining
Env.execute ()
two。 Register topic in Pusar as streaming tables
Val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create (env)
Val prop = new Properties () prop.setProperty ("service.url", serviceUrl) prop.setProperty ("admin.url", adminUrl) prop.setProperty ("flushOnCheckpoint", "true") prop.setProperty ("failOnWrite", "true") props.setProperty ("topic", "test-sink-topic")
TEnv .connect (new Pulsar (). Properties (props)) .inAppendMode () .registerTableSource ("sink-table")
Val sql = "INSERT INTO sink-table." tEnv.sqlUpdate (sql) env.execute ()
Flink & Pulsar: write data to Pulsar
1. Create a Pulsar sink for streaming queries
Val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream =.
Val prop = new Properties () prop.setProperty ("service.url", serviceUrl) prop.setProperty ("admin.url", adminUrl) prop.setProperty ("flushOnCheckpoint", "true") prop.setProperty ("failOnWrite", "true") props.setProperty ("topic", "test-sink-topic")
Stream.addSink (new FlinkPulsarSink (prop, DummyTopicKeyExtractor)) env.execute ()
two。 Write streaming table to Pulsar
Val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create (env)
Val prop = new Properties () prop.setProperty ("service.url", serviceUrl) prop.setProperty ("admin.url", adminUrl) prop.setProperty ("flushOnCheckpoint", "true") prop.setProperty ("failOnWrite", "true") props.setProperty ("topic", "test-sink-topic")
TEnv .connect (new Pulsar (). Properties (props)) .inAppendMode () .registerTableSource ("sink-table")
Val sql = "INSERT INTO sink-table." tEnv.sqlUpdate (sql) env.execute ()
In the above example, Flink developers do not have to worry about schema registration, serialization / deserialization, and register the Pulsar cluster as source, sink, or streaming table in Flink.
When these three elements exist at the same time, Pulsar will be registered as catalog in Flink, which can greatly simplify data processing and query, such as writing programs to query data from Pulsar, querying Pulsar data streams using Table API and SQL, and so on.
The above is how to use Apache to query the Pulsar stream. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.