In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article will explain in detail how to use Kafka in java. The editor thinks it is very practical, so I share it with you for reference. I hope you can get something after reading this article.
Preface
Official document: http://kafka.apache.org/
Chinese document: https://kafka.apachecn.org/
Apache Kafka is a distributed publish-subscribe messaging system.
Compared with traditional messaging systems, Apache Kafka has the following differences:
It is designed as a distributed system and is easy to scale out.
It provides high throughput for both publish and subscribe
It supports multiple subscribers and automatically balances consumers in the event of failure.
It persists messages to disk, so it can be used for bulk consumption, such as ETL, and real-time applications.
1 brief introduction
First, there are some concepts:
Kafka runs on one or more servers as a cluster. Kafka classifies the stored stream data through topic. Each record contains a key, a value, and a timestamp (timestamp).
Kafka has four core API:
The Producer API allows an application to publish streaming data to one or more Kafka topic.
The Consumer API allows an application to subscribe to one or more topic and process streaming data published to them.
The Streams API allows an application to act as a stream processor, consume one or more topic-generated input streams, and then produce an output stream to one or more topic for efficient conversion in the input-output stream.
The Connector API allows you to build and run reusable producers or consumers to connect Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to the table (table).
Supported languages (except Java):
Common concepts:
1 Topics and Log
Let's first take a closer look at the core concept of Kafka: providing a stream of records-topic
Topic is the theme of data, the place where data records are published, and can be used to distinguish business systems. Topics in Kafka is always in multi-subscriber mode, and a topic can have one or more consumers to subscribe to its data.
For each topic, the Kafka cluster maintains a partition log, as shown below:
Each record in the partition is assigned an id number to represent the order, which we call offset,offset to uniquely identify each record in the partition.
The Kafka cluster keeps records of all releases-whether they have been consumed or not-and is controlled by a configurable parameter-retention period. For example, if the retention policy is set to 2 days, a record can be consumed at any time within two days after it is published, and after two days the record will be discarded and free disk space. The performance of Kafka has nothing to do with data size, so there is no problem with storing data for a long time.
The partition (partition) in the log has the following uses. First, the log size exceeds the limit of a single server, allowing logs to be expanded. Each individual partition must be limited by the host's file restrictions, but a topic may have multiple partitions, so it can handle an unlimited amount of data. Second, it can be used as a parallel set of units-on this point, more details are as follows
2 distributed
The partition partition (distribution) of the logs is on the servers in the Kafka cluster. Each server shares these partitions as it processes data and requests. Each partition is backed up on a configured server to ensure fault tolerance.
Each partition has one server as "leader" and zero or more server as follwers. Leader server handles all read and write requests to partition (partitions), while follwers only needs to passively synchronize data on leader. When leader goes down, a server in followers automatically becomes the new leader. Each server becomes the leader of some partitions and the follower of some partitions, so the load of the cluster is balanced.
3 producers
Producers can publish data to the topic of their choice. The producer is responsible for assigning records to which partition (partition) of the topic. You can simply use loops to achieve load balancing, or you can do it according to some semantic partitioning functions (such as key in records). More about the use of partitions is described below.
4 consumers
Consumers are identified by a consumer group name, and each record published to topic is assigned to a consumer instance in the subscription consumer group. Consumer instances can be distributed across multiple processes or on multiple machines.
If all consumer instances are in different consumer groups, each message record is broadcast to all consumer processes.
As shown in the figure, this Kafka cluster has two server, four partitions (p0-p3) and two consumer groups. There are two consumers in consumer group An and four consumers in consumer group B.
Typically, each topic has some consumer groups, each corresponding to a "logical subscriber". A consumer group consists of many consumer instances, making it easy to scale and fault-tolerant. This is the concept of publishing and subscribing, except that subscribers are a group of consumers rather than individual processes.
Consumption is achieved in Kafka by dividing the partitions in the log into each consumer instance so that each instance is the only consumer of the partition at any time. The consumption relationship in the maintenance consumption group is handled dynamically by the Kafka protocol. If new instances join the group, they will take over some partition partitions from other members of the group; if one instance disappears, the owned partitions will be distributed to the remaining instances.
Kafka only guarantees that the records in the partition are orderly, not the order of the different partitions in the topic. Each partition partition sorted by key value is sufficient to meet the needs of most applications. But if you need to always record on top of all records, you can use a theme with only one partition, which means that there is only one consumer process per consumer group.
Guarantee
High-level Kafka gives the following assurance:
Messages sent by producers to a specific topic partition are processed in the order in which they are sent. That is, if record M1 and record M2 are sent by the same producer and M1 records are sent first, then the offset of M1 is smaller than M2, and a consumer instance appears earlier in the log to view the records in the order in the log. For topics with N copies, we tolerate a maximum of 1 server failure to ensure that no records submitted to the log are lost.
More details on the warranty can be found in the design section of the document.
2 download and install
Download or install address:
JDK1.8://www.yisu.com/article/229780.htm:
Https://www.yisu.com/article/229783.htm:
Https://kafka.apachecn.org/downloads.html
Okay, let's start with the installation.
[root@iZ2ze4m2ri7irkf6h7n8zoZ local] # tar-zxf kafka_2.11-1.0.0.tgz [root@iZ2ze4m2ri7irkf6h7n8zoZ local] # mv kafka_2.11-1.0.0 kafka-2.113 basically use 3.1 to start Kafka
First check whether your jdk is installed:
[root@iZ2ze4m2ri7irkf6h7n8zoZ local] # java-versionjava version "1.8.0,144" Java (TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot (TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
Start Zookeeper:
[root@iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9] # lsbin conf docs lib LICENSE.txt NOTICE.txt README.md README_ packaging. Txt [root @ iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9] # cd conf/ [root@iZ2ze4m2ri7irkf6h7n8zoZ conf] # lsconfiguration.xsl log4j.properties zoo_ sample.cfg [root @ iZ2ze4m2ri7irkf6h7n8zoZ conf] # cp zoo_sample.cfg zoo.cfg [root@iZ2ze4m2ri7irkf6h7n8zoZ conf] # cd.. / bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # lsREADME.txt zkCli.cmd zkEnv.cmd zkServer.cmd zkServer .sh zkTxnLogToolkit.shzkCleanup.sh zkCli.sh zkEnv.sh zkServer-initialize.sh zkTxnLogToolkit.cmd [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / zkServer.zkServer.cmd zkServer.sh [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / zkServer.sh startZooKeeper JMX enabled by defaultUsing config: / usr/local/zookeeper-3.5.9/bin/../conf/zoo.cfgStarting zookeeper... STARTED
Start Kafka:
[root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11] # lsbin config libs LICENSE NOTICE site-docs [root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11] # cd config/ [root@iZ2ze4m2ri7irkf6h7n8zoZ config] # lsconnect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.propertiesconnect-console-source.properties connect-log4j.properties producer.propertiesconnect-distributed.properties connect-standalone.properties server.propertiesconnect-file-sink.properties consumer.properties tools-log4j.properties [ Root@iZ2ze4m2ri7irkf6h7n8zoZ config] # cd.. / bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-server-start.sh.. / config/server.properties [2021-11-20 10 bin/ 21 root@iZ2ze4m2ri7irkf6h7n8zoZ bin 10326] INFO KafkaConfig values:. [2021-11-20 10 bin/ 21 bin/ 12423] INFO Kafka version: 1.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-11-20 10 bin/ 2112 423] INFO Kafka commitId: aaa7af6d4a11b29d (org.apache.kafka.common. Utils.AppInfoParser) [2021-11-20 10] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) 3.2 simple test use
Create and view topic
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-topics.sh-- create-- zookeeper localhost:2181-- replication-factor 1-- partitions 1-- topic ymxCreated topic "ymx". [root @ iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-topics.sh-- list-- zookeeper localhost:2181ymx
The producer sends a message:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-console-producer.sh-- broker-list localhost:9092-- topic ymx > Hello Kafka! > Hello Ymx! > Hello Kafka and Ymx! >
Consumer consumption news:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-console-consumer.sh-- bootstrap-server localhost:9092-- topic ymx-- from-beginningHello Kafka clusters Hello Ymxxed Hello Kafka and Ymxboxes 3.3 set up a multi-agent cluster 3.3.1 and started to build
First of all, the configuration file under copy is required.
[root@iZ2ze4m2ri7irkf6h7n8zoZ config] # cp server.properties server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ config] # cp server.properties server-02.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ config] # vim server-01.properties # the beginning of the content is about # broker.id=1 # 21, and the unique ID of broker (in the same cluster) is about listeners=PLAINTEXT://:9093 # 31. Release The port number log.dirs=/tmp/kafka-logs-01 # 60, which represents kafka, is a comma-separated directory list in which the log file # content ends # [root@iZ2ze4m2ri7irkf6h7n8zoZ config] # vim server-02.properties # the beginning of the content is about # broker.id=2 # 21 lines Unique ID of broker (in the same cluster) listeners=PLAINTEXT://:9094 # 31, release, port number log.dirs=/tmp/kafka-logs-02 # 60, which represents kafka, comma-separated directory list, in which log file # content ends #
Start Kafka according to the configuration file (under the same host)
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-server-start.sh.. / config/server-01.properties
Error message:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-server-start.sh.. / config/server-01.properties Java HotSpot (TM) 64-Bit Server VM warning: INFO: os::commit_memory (0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue.# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.# An error report file with more information is saved as:# / usr/local/kafka-2.11/bin/hs_err_pid4036.log
Reason: insufficient physical or virtual machine memory to guarantee the content capacity required when Kafka starts or runs
Solution:
Increase the memory of a physical machine or virtual machine
Reduce the configuration of the content required for Kafka startup, and the file to be modified is kafka-server-start.sh
Export KAFKA_HEAP_OPTS= "- Xmx512M-Xms256M" # 29 or so 3.3.2 use
After the problem is solved, we start:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-server-start.sh.. / config/server-01.properties [2021-11-20 10 config/server-01.properties 58 config/server-01.properties 33138] INFO KafkaConfig values: [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-server-start.sh.. / config/server-02.properties [2021-11-20 10 config/server-01.properties 59 Swiss 04187] INFO KafkaConfig values:
Ps: take a look at the status of our Ali CVM.
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-topics.sh-- create-- zookeeper localhost:2181-- replication-factor 3-- partitions 1-- topic mr-yanCreated topic "mr-yan". [root @ iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-topics.sh-- describe-- zookeeper localhost:2181-- topic mr-yanTopic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs: Topic:mr-yan Partition: 0 Leader: 1 Replicas: 1 Replicas: 1 Isr: 1 Isr 0 2
PartitionCount: the number of topic partitions.
ReplicationFactor: the number of copies used to set the theme.
Leader: the node responsible for all read and write operations for a given partition. Each node is a randomly selected partial partition leader.
Replicas: is a list of nodes that replicate partition logs, whether they are leader or just alive.
Isr: is a set of "synchronized" replicas, a subset of the replicas list that is alive and pointed to the leader.
For use in a cluster environment:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-console-producer.sh-- broker-list localhost:9092-- topic mr-yan > Hello Kafkas! > Hello Mr.Yan > [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-console-consumer.sh-- bootstrap-server localhost:9092-- from-beginning-- topic mr-yanHello Kafkashire Hello Mr.Yan3.3.3 verifies fault tolerance
First, let's stop a Broker for Kafka:
[root@iZ2ze4m2ri7irkf6h7n8zoZ ~] # ps-ef | grep server-01.propertiesroot 19859 28247 1 10:58 pts/3.. / config/server-01.propertiesroot 23934 16569 0 11:12 pts/11 00:00:00 grep-- color=auto server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ ~] # kill-9 28247 [root@iZ2ze4m2ri7irkf6h7n8zoZ] # ps-ef | grep server-01.propertiesroot 32604 16569 0 11:13 pts/11 00:00:00 grep-color=auto server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ ~] # cd / usr/local/kafka-2.11/bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-topics.sh-- describe-- zookeeper localhost:2181-- topic mr-yanTopic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs: Topic:mr-yan Partition: 0 Leader: 0 Replicas: 1 Replicas 0 2 Isr: 0,2
Look at the changes of producers and consumers and use them again, and find that they can still be used.
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin] # / kafka-console-producer.sh-- broker-list localhost:9092-- topic mr-yan > Hello Kafkas! > Hello Mr.Yan > [2021-11-20 11 broker-list localhost:9092 12 topic mr-yan 28881] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) > Hello Kafkas too! > Hello Mr.Yan too! > [root@iZ2ze4m2ri7irkf6h7n8zoZ bin] #. / kafka-console-consumer.sh-- bootstrap-server localhost:9092-- from-beginning-- topic mr-yanHello Kafkashiro Hello Mr.Yan [2021-11-20 11-20 11-12 Consumer clientId=consumer-1] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2021-11-20 11 Consumer clientId=consumer-1 12 groupId=console-consumer-22158] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Hello Kafkas toodies Hello Mr.Yan toodies 4
The concept of theme, partition, copy
Kafka delivers messages according to the topic (topic), but it also has the concept of partition and replica, which are explained below:
Replica: the same data is saved between several replicas of the same partition, and the relationship between the replicas is "one master and multiple slaves", in which the master (leader) is responsible for providing read and write services, while the slave (follower) is responsible for synchronizing data with the master node. When the master node goes down, the slave node can re-elect the leader for external services.
Kafka ensures that messages within the same partition are orderly, but not that messages within the topic are orderly.
This is the end of the article on "how to use Kafka in java". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.