In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the relevant knowledge of "the basic use of ksqlDB". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Basic Concepts ksqlDB Server
KsqlDB is an event flow database and a special database. The real-time data stream processing engine based on Kafka provides a powerful and easy-to-use SQL interaction way to deal with Kafka data streams without writing code. KSQL has excellent features such as high scalability, high elasticity, and fault tolerance, and it provides a wide range of streaming operations, such as data filtering, transformation, aggregation, connecting join, windowing, and Sessionization (that is, capturing all flow events during a single session).
KsqlDB CLI
The KSQL command line interface (CLI) interactively writes KSQL queries. KSQL CLI acts as a client to KSQL Server.
Event (Event)
KsqlDB is designed to increase abstraction by using lower-level stream processors. Typically, an event is called a "row" as if it were a row in a relational database.
Stream (Stream)
A stream represents a partitioned, immutable, and appendable collection of a series of historical data. Once a row is inserted into the stream, it cannot be changed. You can add a new row at the end of the stream, but you can never update or delete an existing row. Each row of data is stored in a specific partition, each row implicitly or explicitly has a key that represents its identity, and all rows with the same key are in the same partition.
Table (Table)
A table is a collection of mutable partitions whose contents change over time. The stream represents the historical sequence of events, whereas the table represents the current real situation. The table works by using the keys of each row. If a row sequence shares a key, the last line of the given key represents the latest information identified by the key, and the background process runs periodically and deletes all rows except the latest line.
Give examples to illustrate
Suppose users Alice and Bob started with $200 and $100, respectively, and went through the following series of transactions:
Alice transfers $100to Bob.
Bob transfers $50 to Alice.
Bob transfers $100to Alice.
In the example, Stream represents the history of the transfer of funds from one account to another, and Table reflects the latest status of each user account. So we conclude that Table will have the current status of the account, while Stream will capture transactions.
Stream can be thought of as a change log for Table because updating the aggregation of Stream over time results in a table. You can think of a Table at some point as a snapshot of the latest value of each key in the Stream (the data record of the stream is a key-value pair), and observing the changes in Table over time produces a Stream.
Docker deployment ksqlDB
Create a docker-compose.yaml file containing ksqlDB Server and ksqlDB Cli:
-version: '2'services: ksqldb-server: image: confluentinc/ksqldb-server:0.15.0 hostname: ksqldb-server container_name: ksqldb-server ports:-"8088 kafka 8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: 192.168.1.87 KSQL_BOOTSTRAP_SERVERS 9092 # address of the kafka cluster to be connected KSQL_KSQL_LOGGING_ PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.15.0 container_name: ksqldb-cli depends_on:-ksqldb-server entrypoint: / bin/sh tty: true
Start with the docker-compose up-d command, and then connect to ksql with the following command:
Docker exec-it ksqldb-cli ksql http://ksqldb-server:8088OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. = _ = = | | | _ | | _ | _) = = | | / / _ _ | / _ `| _\ = = | Producer code package tuling.kafkaDemo Import com.alibaba.fastjson.JSON;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class MsgProducer {private final static String TOPIC_NAME = "cr7-topic"; public static void main (String [] args) throws InterruptedException, ExecutionException {Properties props = new Properties () Props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092"); / / props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.87 purl 9092192.168.1.88); / / props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG," 11.8.36.125 purge 9092 parentin 11.8.38.116 props.put 9092 maxim 11.8.38.120,9092 ") / * sending message persistence mechanism parameter (1) acks=0: indicates that producer does not need to wait for any broker to confirm that it has received a reply before it can send the next message. The highest performance, but the most likely to lose messages. (2) acks=1: at least wait for leader to successfully write data to the local log, but do not wait for all follower to be written successfully. You can move on to the next message. In this case, if the follower does not back up the data successfully and the leader dies, the message will be lost. (3) acks=-1 or all: you need to wait for min.insync.replicas (default is 1, recommended configuration is greater than or equal to 2). The number of copies configured for this parameter is successfully written to the log. This policy ensures that as long as one backup survives, no data will be lost. This is the strongest data guarantee. This configuration is generally used unless it is at the financial level or in scenarios where money is being dealt with. * / props.put (ProducerConfig.ACKS_CONFIG, "1") / * if it fails to send, it will be retried. The default retry interval is 100ms. Retry can guarantee the reliability of message delivery, but it may also cause message to be sent repeatedly, such as network jitter. Therefore, idempotent processing of message reception needs to be done at the receiver's side * / props.put (ProducerConfig.RETRIES_CONFIG, 3). / / set the retry interval to props.put (ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); / / set the local buffer for sending messages. If this buffer is set, messages will be sent to the local buffer first, which can improve message sending performance. The default value is 33554432, that is, 32MB props.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432). / * the kafka local thread takes data from the buffer, sends it to broker in batches, and sets the size of the batch message. The default value is 16384, that is, 16kb, which means that a batch will be sent out when the 16kb is full * / props.put (ProducerConfig.BATCH_SIZE_CONFIG, 16384) / * the default value is 0, which means that the message must be sent immediately, but this will affect the performance of about 10 milliseconds, that is, after the message is sent, it will enter a local batch. If the batch is full within 10 milliseconds, the 16kb will be sent along with the batch. If the batch is not full within 10 milliseconds, then the message must also be sent. Do not allow messages to be sent for too long a delay * / props.put (ProducerConfig.LINGER_MS_CONFIG, 10) / serialize the key of the sending message from a string to a byte array props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ()); / / serialize the sending message value from a string to a byte array props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ()); / / create a Kafka consumer instance Producer producer = new KafkaProducer (props) Int msgNum = 50; final CountDownLatch countDownLatch = new CountDownLatch (msgNum); for (int I = 1; I PRINT 'cr7-topic' FROM BEGINNING limit 5) Key format: JSON or KAFKA_STRINGValue format: JSON or KAFKA_STRINGrowtime: 2021-02-27 16 key: 2, value: {"orderAmount": 1000, "orderId": 2, "productId": 102,102, "productNum": 1}, partition: 2rowtime: 2021-02-27 16 2rowtime 1146.239 Z, key: 3, value: {"orderAmount": 1000, "orderId": 3, "productId": 1000, "productNum": 1}, partition: 2rowtime: 2021-02-27 16key 1146.240 Z Key: 9, value: {"orderAmount": 1000, "orderId": 9, "productId": 109, "productNum": 1}, partition: 2rowtime: 2021-02-27 16 orderAmount 11V 46.241Z, key: 16, value: {"orderAmount": 1000, "orderId": 16, "productId": 116,116, "productNum": 1}, partition: 2rowtime: 2021-02-27 16 Suzhou 11141Z, key: 29, value: {"orderAmount": 1000, "orderId": 29, "productId": 129,129 "productNum": 1}, partition: 2 create Stream
Create a Stream based on the topic named cr7-topic. Note that the name of the Stream cannot have -:
Ksql > CREATE STREAM cr7_topic_stream (orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER) WITH (kafka_topic='cr7-topic',value_format='json'); Message- Stream created- lists all Streamksql > list streams Stream Name | Kafka Topic | Key Format | Value Format | Windowed- CR7_TOPIC_STREAM | | cr7-topic | KAFKA | JSON | false queries Stream data |
When you run the Producer program, you can see that the data is output continuously:
Ksql > select * from CR7_TOPIC_STREAM EMIT CHANGES +-- + | ORDERAMOUNT | | ORDERID | PRODUCTID | PRODUCTNUM | +-- -+ | 1000 | 4 | 104 | 1 | | 1000 | 6 | 106 | 1 | | 1000 | 10 | 110 | 1 | | 1000 | 12 | 112 | 1 | | 1000 | 13 | | | 1000 | 1 | 1000 | 14 | 114 | 1 | | 1000 | 18 | 118 | 1 | | | 1000 | 19 | 119 | 1 | | 1000 | 20 | 1 | 1 | 1000 | | | 24 | 124 | 1 | | 1000 | 26 | 126 | 1 | | 1000 | 31 | 131 | | | 1 | | 1000 | 35 | 1 | | 1000 | 38 | 138 | 1 | | | 1000 | 39 | 139 | 1 | | 1000 | 42 | 142 | 1 | | 1000 | 46 | | | 1000 | 1 | 1000 | 1 | 101 | 1 | | 1000 | 5 | 105 | 1 | | | 1000 | 7 | 107 | 1 | | 1000 | 8 | 108 | 1 | 1000 | | | 11 | 111 | 1 | | 1000 | 15 | 115 | 1 | | 1000 | 17 | 117 | | | 1 | | 1000 | 21 | 121 | 1 | | 1000 | 22 | 122 | 1 | | | 1000 | 23 | 123 | 1 | | 1000 | 25 | 125 | 1 | | 1000 | 2 | | | 1000 | 1 | 1000 | 3 | 103 | 1 | create another Stream via Stream |
Write the data with singular orderid in Stream cr7_topic_stream to the new Stream S3:
Ksql > CREATE STREAM S3 AS SELECT * FROM cr7_topic_streamWHERE (orderid%2)! = 0 EMIT CHANGES
Looking at Stream S3, you can see that only orderid is singular:
Ksql > select * from S3 emit changes +-- + | ORDERAMOUNT | | ORDERID | PRODUCTID | PRODUCTNUM | +-- -+ | 1 | 1000 | 1 | 1000 | 5 | 105 | 1 | | 1000 | 7 | 107 | 1 | | 1000 | 11 | 111 | 1 | | 1000 | 15 | | | 1000 | 1 | 1000 | 17 | 117 | 1 | | 1000 | 21 | 121 | 1 | | | 1000 | 23 | 123 | 1 | | 1000 | 25 | 125 | 1 | | 1000 | | | 27 | 127 | 1 | | 1000 | 33 | 133 | 1 | | 1000 | 37 | 137 | | | 1 | | 1000 | 43 | 1 | | 1000 | 45 | 145 | 1 | | | 1000 | 47 | 1 | | 1000 | 13 | 113 | 1 | | 1000 | 19 | | | 1 | 1 | 1000 | 31 | 131 | 1 | | 1000 | 35 | 135 | 1 | | 1000 | 39 | 139 | 1 | | 1000 | 3 | 103 | 1 | query on Stream data aggregation |
Query the total number of entries and the sum of orderamount in Stream cr7_topic_stream, grouped by productnum:
Ksql > SELECT COUNT (*), SUM (orderamount) from cr7_topic_stream GROUP BY productnum EMIT CHANGES +-+ | KSQL_COL_ 0 | KSQL_COL_1 | +-+- -+ | 50 | 50000 insert data ksql > INSERT INTO cr7_topic_stream (orderId) into the manual Stream ProductNum) values (777507777) View Stream data structure ksql > describe cr7_topic_stream;Name: CR7_TOPIC_STREAM Field | Type--- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER---For runtime statistics and query details run: DESCRIBE EXTENDED
View the details on the EXTENDED parameter:
Ksql > describe extended cr7_topic_stream;Name: CR7_TOPIC_STREAMType: STREAMTimestamp field: Not set-using Key format: KAFKAValue format: JSONKafka topic: cr7-topic (partitions: 3, replication: 3) Statement: CREATE STREAM CR7_TOPIC_STREAM (ORDERAMOUNT INTEGER, ORDERID INTEGER, PRODUCTID INTEGER, PRODUCTNUM INTEGER) WITH (KAFKA_TOPIC='cr7-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') Field | Type--- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER---Sources that have a DROP constraint on this source -S3Local runtime statistics---- (Statistics of the local KSQL server interaction with the Kafka topic cr7-topic) Delete StreamDROP STREAM cr7_topic_stream Create Table
Must contain the primary key, which is the key specified when the Kafka producer produces the message.
Ksql > CREATE TABLE cr7_topic_table (orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER, kafkaProducerKey VARCHAR PRIMARY KEY) WITH (kafka_topic='cr7-topic',value_format='json')
The kafka script production message specifies the method of key:
# use a comma as the separator between key and value. Kafka-console-producer.sh-- broker-list kafka1:9092-- topic cr7-topic-- property parse.key=true-- property key.separator=, > mykey, {"orderAmount": 1000, "orderId": 1, "productId": 101, "productNum": 1} View Table information ksql > describe cr7_topic_table Name: CR7_TOPIC_TABLE Field | Type- ORDERAMOUNT | INTEGER ORDERID | INTEGER (primary key) PRODUCTID | INTEGER PRODUCTNUM | INTEGER-- -For runtime statistics and query details run: DESCRIBE EXTENDED Ksql > query Tableksql > select * from cr7_topic_table emit changes +-+ | KAFKAPRODUCERKEY | ORDERAMOUNT | ORDERID | | PRODUCTID | PRODUCTNUM | +-| -+ | 1 | 1000 | 1 | 1 | 2 | 1000 | 2 | 2 | 2 | 3 | 1000 | | 3 | 103 | 3 |. # when the producer re-produces data | Put # Order order = new Order (I, 100 + I, 1, 1000.00) in the Java code Modify to # Order order = new Order (I, 100 + I, 1, 2000.00) # when the key value is the same Check that cr7_topic_table will be the latest value | 2 | 2000 | 2 | 102 | 2 | 3 | 2000 | 3 | 3 | 1 | 2000 | | 1 | 101 | 1 |. This is the end of the content of "basic usage of ksqlDB". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.