In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
Overview 1.1.What is kafka
Kafka is a distributed message queue developed in the Scala language. Kafka classifies messages according to Topic when they are saved, the sender is called Producer, and the message receiver is called Consumer. In addition, the kafka cluster is composed of multiple kafka instances, and each instance (server) becomes broker. Both kafka clusters and producer and consumer rely on zookeeper clusters to store some meta information to ensure system availability.
1.2 two working modes of message queuing
(1) the peer-to-peer model (similar to accepting files, one-to-one, where consumers actively pull data and clear messages after receiving messages) is usually a messaging model based on pull or polling, which requests information from the queue instead of pushing the message to the client. The characteristic of this model is that messages sent to the queue are received and processed by one and only one receiver, even if there are multiple message listeners.
(2) publish / subscribe mode (similar to official account, one-to-many, push to all subscribers after data production)
The publish-subscribe model is a push-based messaging model. The publish-subscribe model can have many different subscribers, temporary subscribers receive messages only when actively listening to topics, while persistent subscribers listen for all messages on topics, even if the current subscriber is unavailable and offline.
1.3 Why message queuing is needed
1) decoupling:
Allows you to extend or modify processes on both sides independently, as long as you make sure they follow the same interface constraints.
2) redundancy:
Message queues persist data until they have been fully processed, avoiding the risk of data loss. In the insert-get-delete paradigm used by many message queues, before deleting a message from the queue, your processing system needs to clearly indicate that the message has been processed to ensure that your data is safely saved until you have finished using it.
3) scalability:
Because message queues decouple your processing, it is easy to increase the frequency of queuing and processing of messages, as long as you add additional processing.
4) flexibility & peak processing power:
Applications still need to continue to play a role in the face of a surge in traffic, but such bursts of traffic are not common. It is undoubtedly a huge waste to invest resources at any time in order to be able to handle this kind of peak access. The use of message queuing enables key components to withstand sudden access pressure without completely collapsing due to sudden overloaded requests.
5) recoverability:
When some components of the system fail, the whole system will not be affected. Message queuing reduces the coupling between processes, so even if a process that processes messages dies, messages added to the queue can still be processed after the system is restored.
6) order guarantee:
In most usage scenarios, the order of data processing is important. Most message queues are sorted by nature, and it is guaranteed that the data will be processed in a specific order. (Kafka ensures the order of messages in a Partition, but cannot guarantee the overall order. There is only one partition for a topic.)
7) buffer:
It is helpful to control and optimize the speed of data flow through the system and solve the inconsistency between the processing speed of production messages and consumer messages.
8) Asynchronous communication:
In many cases, users do not want or need to process messages immediately. Message queuing provides an asynchronous processing mechanism that allows users to put a message on the queue, but does not process it immediately. Put as many messages as you want in the queue, and then process them when needed.
1.4 consumption model
After the message is published by the producer to the Kafka cluster, it will be consumed by consumers. There are two kinds of message consumption models: push model (push) and pull model (pull).
1.4.1 push Model (push)
is a message system based on push model (push), in which the consumption status of consumers is recorded by message agents. After pushing the message to the consumer, the message broker marks the message as consumed, but there is no good guarantee that the message will be processed. For example, after the message agent sends out the message, when the consumer process dies or does not receive the message due to network reasons, the message may be lost (because the message agent has marked the message as consumed). But in fact, the message is not actually processed. If you want to ensure that the message is processed, after the message agent has sent the message, you need to set the status to "sent" and update it to "consumed" only after receiving a confirmation request from the consumer, which requires that all consumption status be recorded in the message broker. This is obviously not desirable.
1.4.2 pull Model (pull)
Kafka uses the pull model, in which consumers record the consumption status themselves, and each consumer reads the messages of each partition independently and sequentially. As shown in the following figure, there are two consumers (different consumer groups) pulling messages on the same topic. Consumer A has a consumption progress of 3 and Consumer B has a consumption progress of 6. The maximum limit pulled by the consumer is controlled by the maximum water level (watermark) (that is, only the last message of the current topic can be obtained). If the latest message written by the producer has not reached the number of backups (that is, to ensure that the number of copies is written to ensure that the message is not lost, so that the message is not lost to consumers), it is not visible to consumers. The advantage of this consumer-controlled offset is that consumers can consume messages in any order. For example, consumers can reset to the old offset and reprocess previously consumed messages, or jump directly to the nearest location and start spending from the current moment.
figure 1.1 kafka consumption model
in some messaging systems, message agents delete messages immediately after they are consumed. If there are different types of consumers subscribing to the same topic, the message broker may need to store the same message redundantly, or wait until all consumers have finished consuming, which requires the message broker to track the consumption status of each consumer. this design greatly limits the overall throughput and processing latency of the message system. Kafka's practice is that all messages published by producers are consistently stored in the Kafka cluster, regardless of whether the messages are consumed or not. Users can clean up expired data by setting the retention time, for example, by setting the retention policy to two days. Then, after the message is released, it can be consumed by different consumers, and after two days, expired messages will be automatically cleaned up.
Key concepts in 1.5 kafka 1) Producer: the message producer is the client that sends messages to kafka broker. 2) Consumer: message consumer, client that fetches messages from kafka broker 3) Topic: can be understood as a queue. Is a grouping of messages 4) Consumer Group (CG): an extensible and fault-tolerant consumer mechanism provided by kafka. Since it is a group, there must be multiple consumers or consumer instances (consumer instance) within the group that share a common ID, the group ID. All consumers in the group coordinate to consume all the partition of the subscription topic (subscribed topics). Of course, each partition can only be consumed by one consumer within the same consumer group. However, it is possible for different consumer groups to consume the same topic, and it does not affect each other. Consumers can read a large number of messages at the same time through horizontal scaling. In addition, if a consumer fails, other group members will automatically load balance the previously failed consumer read partition 5) Broker: a kafka server is a broker. A cluster consists of multiple broker. A broker can hold multiple topic. 6) Partition: to achieve scalability, a very large topic can be distributed to multiple broker (that is, servers), a topic can be divided into multiple partition, and each partition is an ordered queue. Each message in partition is assigned an ordered id (offset). Kafka only guarantees that messages are sent to consumer in the order in one partition, not the order of a topic as a whole (among multiple partition). 7) the storage files of Offset:kafka are named according to offset.kafka, and the advantage of using offset as a name is that it is easy to find. For example, if you want to find the location at 2049, just find the 2048.kafka file. Of course, the first offset is 00000000000.kafka. Generally speaking, if the topic "test" has two partitions, then there are two storage directories for the topic, named test-0,test-1, and then save the corresponding data in the corresponding directory. 2. Kafka deployment 2.1Environmental planning and preparation
First, kafka relies on zookeeper to store meta-information and requires jdk to run the program. So you need to deploy these two in advance. Please read the previous article.
Prepare three virtual machines:
Bigdata121bigdata122bigdata123zookeeper1zookeeper2zookeeper3kafka1kafkakafka3
Software version:
Jdk1.8zookeeper3.4.10kafka2.1.1centos7.2.15112.2 deployment
Bigdata121:
1. Decompress: tar zxf kafka_2.11-2.1.1.tgz-C / opt/modules/2, create log directory: mkdir / opt/modules/kafka_2.11-2.1.1/logs3, modify kafka server configuration file: vim / opt/modules/kafka_2.11-2.1.1/config/server.properties#### modify some key configurations # globally unique number of broker You cannot repeat whether broker.id=0# allows topic to be deleted. The test environment is convenient to test and is set to true. It is recommended that the production environment be set to the path where falsedelete.topic.enable=true#kafka running logs are stored, log.dirs=/opt/modules/kafka_2.11-2.1.1/logs#, configure the connection Zookeeper cluster address, and / path/to is the path of the root node stored in zookeeper. For example, / rootzookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181/path/to4, configure environment variable vim / etc/profile.d/kafka.sh #! / bin/bashexport KAFKA_HOME=/opt/modules/kafka_2.11-2.1.1export PATH=$PATH:$ {KAFKA_HOME} / bin5, enable environment variable source / etc/profile.d/kafka.sh
Once configured, rsync the entire directory of kafka to the / opt/modules of the other two hosts, and modify
/ opt/modules/kafka_2.11-2.1.1/config/server.properties this configuration file
Broker.id=1 and broker.id=2 are the id of every broker must be unique.
Start the kafka cluster node on the three machines:
Kafka-server-start.sh-daemon config/server.properties-daemon indicates the configuration file path to start the kafka service config/server.properties server as a background process
Stop the current node:
Kafka-server-stop.sh2.3 common command line operations 1) View all topic [root@bigdata11 kafka] $bin/kafka-topics.sh-- zookeeper bigdata13:2181-- list2) create topic [root@bigdata11 kafka] $bin/kafka-topics.sh-- zookeeper bigdata13:2181-- create-- replication-factor 3-- partitions 1-- topic first option description:-- topic definition topic name-- number of replication-factor definition copies-- partitions Define the number of partitions 3) delete topic [root@bigdata11 kafka] $bin/kafka-topics.sh-- zookeeper bigdata11:2181-- delete-- topic first requires delete.topic.enable=true to be set in server.properties, otherwise it is just marked to be deleted or restarted directly. 4) send the message [root@bigdata11 kafka] $bin/kafka-console-producer.sh-- broker-list bigdata11:9092-- topic first > hello world5) consumption message [root@bigdata12 kafka] $bin/kafka-console-consumer.sh-- bootstrap-server node3:9092-- from-beginning-- topic first--from-beginning: it will read out all the previous data in the first topic. Choose whether or not to add the configuration according to the business scenario. 6) View the details of a Topic [root@bigdata11 kafka] $bin/kafka-topics.sh-- zookeeper bigdata11:2181-- describe-- topic first III, how kafka works 3.1 kafka producer write process analysis 3.1.1 write mode
producer uses push mode to publish messages to broker, and each message is append to patition, belonging to sequential write disk (sequential write disk efficiency is higher than random write memory, ensuring kafka throughput)
3.1.2 partition (partition)
The Kafka cluster consists of multiple message broker servers (broker-server), and each message published to the Kafka cluster has a category, represented by a topic. In general, different applications produce different types of data, and different themes can be set. A topic usually has multiple message subscribers, and when a producer publishes a message to a topic, consumers who subscribe to that topic can receive new messages written by the generator.
The Kafka cluster maintains distributed partition log files for each topic, and the topic (topic) can be physically thought of as partitioned log files (partition log). Each partition of the topic is an ordered, immutable sequence of records, and new messages are constantly appended to the log. Each message in the partition is assigned a monotonously increasing sequence number in chronological order, called offset, which uniquely locates each message in the current partition.
messages are sent to a topic, which is essentially a directory, while topic is composed of some Partition Logs (partition logs). Its organizational structure is shown in the following figure: the topic in the following figure has three partitions, and the offset of each partition starts from 0, and the offsets between different partitions are independent and will not affect each other.
figure 3.1 kafka write mode
figure 3.2 kafka partition read
We can see that the messages in each Partition are ordered, the production messages are constantly appended to the Partition log, and each message is assigned a unique offset value.
Each message posted to a Kafka topic includes a key value and a timestamp. When the message reaches the specified partition on the server side, it is assigned a self-increasing offset. The original message content and the assigned offset, as well as some other metadata information, are eventually stored in the partition log file. The key of the message can also be unset, in which case the message will be evenly distributed to different partitions.
3.1.2.1 reasons for Partition
(1) it is easy to expand in a cluster. Each Partition can be adjusted to adapt to its machine, and a topic can be composed of multiple Partition, so the whole cluster can adapt to data of any size.
(2) concurrency can be improved because you can read and write in units of Partition.
The traditional message system maintains the order of messages on the server. If multiple consumers consume the same message queue, the server will send it to the consumer in the order of consumption and storage. However, because messages are sent to consumers asynchronously, the order in which messages arrive at consumers may be out of order, which means that in parallel consumption, traditional messaging systems can not guarantee that messages are processed sequentially. Although we can set up a dedicated consumer to consume only one queue to solve the problem of message order, this makes consumption processing impossible.
Kafka has a stronger sequential guarantee than traditional messaging systems, and it uses the partition of the topic as the parallel unit of message processing. With partitions as the smallest granularity, Kafka assigns each partition to a different and unique consumer in the consumer group, and ensures that a partition belongs to only one consumer, that is, that consumer is the only reader thread for that partition. Then, as long as the messages in the partition are ordered, the order of messages processed by the consumer is guaranteed. Each topic has multiple partitions, and different consumers deal with different partitions, so Kafka not only ensures the order of messages, but also achieves consumer load balancing.
3.1.2.2 principles of zoning
(1) if patition is specified, use the
(2) No patition but key is specified, and a patition is obtained by hash the key.
(3) neither patition nor key is specified, and a patition is selected by polling.
Let's look at the source code of the default partition implementation class:
DefaultPartitioner class public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic (topic); int numPartitions = partitions.size (); / / No key specified, poll to get partition number if (keyBytes = = null) {int nextValue = nextValue (topic); List availablePartitions = cluster.availablePartitionsForTopic (topic) If (availablePartitions.size () > 0) {int part = Utils.toPositive (nextValue)% availablePartitions.size (); return availablePartitions.get (part) .partition ();} else {/ / no partitions are available, give a non-available partition return Utils.toPositive (nextValue)% numPartitions }} else {/ / when key is specified, hash the key to get the partition number / / hash the keyBytes to choose a partition return Utils.toPositive (Utils.murmur2 (keyBytes))% numPartitions;}} 3.1.3 copy (replication)
there may be multiple replication in the same partition (corresponding to the default.replication.factor=N in the server.properties configuration). Without replication, once the broker goes down, all the patition data on it cannot be consumed (the data is lost directly), and producer can no longer store the data in its patition. After the introduction of replication, there may be multiple replication in the same partition. In this case, you need to select one of these replication to interact with this leader only (read and write operations will only interact with leader), and other replication will copy data from the leader as follower and will not perform other operations. When the leader dies, a new leader is selected in the follower.
3.1.4 write process
1) producer first finds the leader of the partition from the "/ brokers/.../state" node of the zookeeper
For example, a complete path such as / brokers/topics/TOPIC_NAME/partitions/NUM_OF_PARTITION/state
2) producer sends messages to the leader
3) leader writes the message to the local log
4) followers sends ACK to leader after writing from leader pull message to local log
5) after leader receives the ACK of replication in all ISR, add HW (high watermark, the last offset of commit) and send ACK to producer
3.2Storage principle of brokers 3.2.1 Storage method
Physically divide topic into one or more patition (corresponding to the num.partitions=3 configuration in server.properties, which is the default number of partitions, which can be manually specified when creating a topic), and each patition physically corresponds to a folder (which stores all messages and index files of the patition), as follows:
The partition directory is named in the form of topicName-partiontionNum. First of all, we created the topic first, which has three partition,0, 1, 2 [root@bigdata11 logs] $lldrwxrwxr-x. 2 root root 4096 August 6 14:37 first-0drwxrwxr-x. 2 root root 4096 August 6 14:35 first-1drwxrwxr-x. 2 root root 4096 August 6 14:37 first-2 [root@bigdata11 logs] $cd first-0 [root@bigdata11 first-0] $ll-rw-rw-r--. 1 root root 10485760 August 6 14:33 00000000000000000000.index this is the index-rw-rw-r--. 1 root root 219 August 6 15:07 00000000000000000000.log this is the partition log, where the messages are stored-rw-rw-r--. 1 root root 10485756 August 6 14:33 0000000000000000.timeindexMurray 1 root root 8 August 6 14:37 leader-epoch-checkpoint3.2.2 Storage Policy
As mentioned earlier, kafka retains all messages regardless of whether the messages are consumed or not, and consumers can offset the data at any time as needed. There are two strategies to delete old data:
1) based on time: log.retention.hours=168, that is, data from 7 days ago is deleted by default
2) based on size: log.retention.bytes=1073741824, deleted more than 1GB
It should be noted that because the time complexity of Kafka reading a specific message is O (1) (because it is read directly through the index, it has nothing to do with size), that is, it has nothing to do with file size, so deleting expired files here has nothing to do with improving Kafka performance.
3.2.3 zookeeper storage structure
Zookeeper stores some meta-information about the entire kafka cluster, such as which broker, which topic, and so on. Let's look at the structure:
figure 3.3 zookeeper storage structure
Some of these directories serve the following purposes:
/ brokers/topics/TOPIC_NAME/partitions/PARTITION_NUM/state: specifies the meta-information of the specified partition of the topic, which stores the id of the broker where the partition leader resides, and which broker all copies are stored in. / brokers/ids/xxxx: what are the broker and the corresponding id/consumer: registered consumer information, such as consumer group id, consumption topic, consumption offset, which consumer in the consumer group consumes which partition, etc. It should be noted that only consumer will be registered with zookeeper, producer will not register with zookeeper 3.3 kafka consumption process analysis
Kafka supports high-level api and low-level api for operation.
3.3.1 Advanced api
1) Advanced API benefits
Advanced API is easy to write
There is no need to manage offset on your own, the system manages itself through zookeeper.
There is no need to manage partitions, replicas, etc., the system automatically manages.
Consumer disconnection will automatically obtain data according to the last offset recorded in zookeeper (the default setting is 1 minute to update the offset stored in zookeeper)
You can use group to distinguish between different program accesses to the same topic (different group records different offset, so that different programs reading the same topic do not interact with each other because of offset)
2) disadvantages of advanced API
Cannot control offset on its own (for some special needs)
Cannot refine controls such as partitions, replicas, zk, etc.
3.3.2 low-level api
1) advantages of low-level API
It allows developers to control the offset and read from wherever they want.
Self-control the connection partition and load balance the partition customization.
Reduced dependence on zookeeper (for example, offset does not have to rely on zk storage, you can store offset on your own, such as in files or memory)
2) shortcomings of low-level API
Too complex, need to control the offset, which partition to connect, find the partition leader and so on.
3.3.3 definition of consumer group
consumers work as consumer group consumer groups, where one or more consumers are grouped together to consume a topic. Each partition can only be read by one consumer in the group at a time, but multiple group can consume the partition at the same time. A consumer reads a partition, or a consumer is the owner of a partition.
in this case, consumers can read a large number of messages at the same time through horizontal scaling. In addition, if a consumer fails, other group members automatically read the partition read by the previously failed consumer.
3.3.4 consumption pattern
consumer uses pull (pull) mode to read data from broker.
The push model is difficult to adapt to consumers with different consumption rates, because the message delivery rate is determined by broker. Its goal is to deliver messages as quickly as possible, but it is easy to cause consumer to be too late to process messages, typically characterized by denial of service and network congestion. On the other hand, pull mode can consume messages at an appropriate rate according to the consumption power of consumer.
for Kafka, pull mode is more suitable, it can simplify the design of broker, consumer can independently control the rate of consuming messages, while consumer can control its own consumption mode, that is, it can consume in batches or one by one, and it can also choose different submission methods to achieve different transmission semantics.
The drawback of the pull pattern is that if kafka does not have data, consumers may fall into a loop and wait for the data to arrive. To avoid this, we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting for data to arrive (and optionally wait to a given number of bytes to ensure a large transfer size).
Fourth, kafka api uses 4.1Environment preparation
Idea creates a maven project and adds kafka dependencies:
Org.apache.kafka kafka_2.12 2.1.1 org.apache.kafka kafka-streams 2.1.1 4.2 producer api (java) 4.2.1 create producer (old api) import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig Public class OldProducer {@ SuppressWarnings ("deprecation") public static void main (String [] args) {Properties properties = new Properties (); / specify broker address list properties.put ("metadata.broker.list", "bigdata11:9092"); / / specify producer requires broker to send ack acknowledgement message properties.put ("request.required.acks", "1") / / specify serialization class properties.put ("serializer.class", "kafka.serializer.StringEncoder"); / / use the above configuration items to create kafka producer Producer producer = new Producer (new ProducerConfig (properties)); / / send the message KeyedMessage message = new KeyedMessage ("first", "hello world"); producer.send (message) }} 4.2.2 create producer (new api) import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer {public static void main (String [] args) {Properties props = new Properties () / / Kafka server hostname and port number props.put ("bootstrap.servers", "bigdata12:9092"); / / waiting for props.put ("acks", "all") from all replica nodes; / / maximum number of attempts to send messages props.put ("retries", 0) / / batch message processing size props.put ("batch.size", 16384); / request delay props.put ("linger.ms", 1); / / send cache memory size props.put ("buffer.memory", 33554432) / / key serialize props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); / / value serialize props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer (props); for (int I = 0; I
< 50; i++) { producer.send(new ProducerRecord("first", Integer.toString(i), "hello world-" + i)); } producer.close(); }}4.2.3 创建生产者带回调函数package com.king.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer { public static void main(String[] args) {Properties props = new Properties(); // Kafka服务端的主机名和端口号 props.put("bootstrap.servers", "bigdata12:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1);// 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord("first", "hello" + i), new Callback() { //重写里面的回到方法 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.out.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); }}4.2.4 自定义分区生产者 旧api: import java.util.Map;import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { // 控制分区 return 0; }} 新api: import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner { @Override public void configure(Map configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 控制分区 return 0; } @Override public void close() { }} 实现好自定义的分区类之后,需要在创建producer的配置项添加指定自定义分区类的配置: properties.put("partitioner.class", "自定义的分区类名,需要全类名");4.3 消费者api(java)4.3.1 创建消费者(旧api)import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect", "bigdata11:2181"); properties.put("group.id", "g1"); properties.put("zookeeper.session.timeout.ms", "500"); properties.put("zookeeper.sync.time.ms", "250"); properties.put("auto.commit.interval.ms", "1000"); // 创建消费者连接器 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); //需要自己维护offset HashMap topicCount = new HashMap(); topicCount.put("first", 1); Map consumerMap = consumer.createMessageStreams(topicCount); KafkaStream stream = consumerMap.get("first").get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } }}4.3.2 创建消费者(新api)import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomNewConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定义kakfa 服务的地址,不需要将所有broker指定上 props.put("bootstrap.servers", "bigdata11:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自动确认offset props.put("enable.auto.commit", "true"); // 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定义consumer KafkaConsumer consumer = new KafkaConsumer(props); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("first", "second","third")); while (true) { // 读取数据,读取超时时间为100ms ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}五、kafka拦截器5.1 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括: (1)configure(configs) 获取配置信息和初始化数据时调用。 (2)onSend(ProducerRecord): 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 (3)onAcknowledgement(RecordMetadata, Exception): 该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 (4)close: 关闭interceptor,主要用于执行一些资源清理工作 如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 5.2 拦截器实例 需求: 实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。 程序: (1)实现时间拦截器: package com.king.kafka.interceptor;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptor { @Override public void configure(Map configs) { } @Override public ProducerRecord onSend(ProducerRecord record) { // 创建一个新的record,把时间戳写入消息体的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { }} (2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器 package com.king.kafka.interceptor;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map configs) { } @Override public ProducerRecord onSend(ProducerRecord record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 统计成功和失败的次数 if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); }} (3)producer主程序 package com.king.kafka.interceptor;import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1 设置配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "bigdata11:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 构建拦截链 List interceptors = new ArrayList(); interceptors.add("com.king.kafka.interceptor.TimeInterceptor"); interceptors.add("com.king.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer producer = new KafkaProducer(props); // 3 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord(topic, "message" + i); producer.send(record); } // 4 一定要关闭producer,这样才会调用interceptor的close方法 producer.close(); }} (4)测试 (1)在kafka上启动消费者,然后运行客户端java程序。[root@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic first1501904047034,message01501904047225,message11501904047230,message21501904047234,message31501904047236,message41501904047240,message51501904047243,message61501904047246,message71501904047249,message81501904047252,message9(2)观察java平台控制台输出数据如下:Successful sent: 10Failed sent: 0六、kafka stream6.1 kafka stream概念以及特点 Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。有如下特点: 1)功能强大 高扩展性,弹性,容错 2)轻量级 无需专门的集群 一个库,而不是框架 3)完全集成 100%的Kafka 0.10.0版本兼容 易于集成到现有的应用程序 4)实时性 毫秒级延迟 并非微批处理 窗口允许乱序数据 允许迟到数据 6.2 为什么要有kafka stream 当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。 既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?主要有如下原因。 第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。 第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。 第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。 第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。 第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。 第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。 6.3 kafka stream数据清洗小实例 (1)需求 实时处理单词带有">> > "the contents of the prefix. For example, enter" test > ximenqing ", which is finally processed to" ximenqing ".
(2) Code program:
Business processing class:
Package com.king.kafka.stream;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;// implements the Processor interface to implement specific business logic public class LogProcessor implements Processor {private ProcessorContext context; @ Override public void init (ProcessorContext context) {this.context = context } / / here is the specific business logic @ Override public void process (byte [] key, byte [] value) {String input = new String (value); / / if "> >" is included, only the content following the tag if (input.contains ("> >")) {input = input.split ("> >") [1] .trim () / / output to the next topic context.forward ("logProcessor" .getBytes (), input.getBytes ());} else {context.forward ("logProcessor" .getBytes (), input.getBytes ());} @ Override public void punctuate (long timestamp) {} @ Override public void close () {}}
Main class entry:
Package com.king.kafka.stream;import java.util.Properties;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder;public class Application {public static void main (String [] args) {/ / define the input topic String from = "first" / / define output topic String to = "second"; / / set parameters Properties settings = new Properties (); settings.put (StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); settings.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092"); StreamsConfig config = new StreamsConfig (settings); / / build topology TopologyBuilder builder = new TopologyBuilder () / / create a builder and specify source, processor, and sink. And give them aliases. / / the parentName here actually specifies the name builder.addSource ("SOURCE", from) .addProcessor ("PROCESS", new ProcessorSupplier () {@ Override public Processor get () {/ / specific analysis processing return new LogProcessor ()) }, "SOURCE") .addSink ("SINK", to, "PROCESS"); / / build processing task, including configuration and task details KafkaStreams streams = new KafkaStreams (builder, config); streams.start ();}}
(3) testing
Run the program, and then start producer and consumer on the command line, depending on the situation:
Start producer [root@bigdata13 kafka] $bin/kafka-console-producer.sh on bigdata13-- broker-list bigdata11:9092-- topic first > hello > world > h > itstar > (6) launch consumer [root@bigdata12 kafka] $bin/kafka-console-consumer.sh-- zookeeper bigdata11:2181-- from-beginning-- topic secondworlditstar on bigdata12
You can see that the data processed by consumption is in line with expectations.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.