Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Kafka usage Summary and production and consumption Demo implementation

2025-10-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/03 Report--

What is kafka?

The Kafka official website's own introduction is: a streaming platform that can support distributed.

Introduction to kafka official website

There are three key capabilities of kafka: 1. Publish subscription record flow, similar to message queuing and enterprise information system 2. Store the record stream in a fault-tolerant persistent way 3. Convection processing kafka is usually used in two types of applications: 1. Build a real-time streaming data pipeline to reliably obtain data between systems or applications. Some basic concepts for building a real-time streaming application kafka that transforms or responds to data streams: 1.Kafka runs as a cluster on one or more servers that can span multiple data centers. The 2.Kafka cluster stores the stream of records in a category called topic. 3. Each record consists of a key, a value, and a timestamp. Kafka core API: 1.Producer API: allows applications to publish stream of records to one or more topic. 2.Consumer API: allows applications to subscribe to one or more topic and process the stream of records generated to them. 3.Streams API: allows the application to act as a stream processor, using input streams from one or more topic and generating one or more output topic output streams, thus effectively converting the input stream into the output stream. 4.Connector API: allows you to build and run reusable producers or consumers to connect topic to existing applications or data systems. For example, a connector to a relational database might capture every change to a table. As a messaging system

There are two types of traditional messaging models: message queuing and publish subscription. In a message queue, a consumer pool can read data from one server, and each record is sent to one of the servers; in publish-subscribe, the record is broadcast to all consumers. These two models have their own advantages and disadvantages:

Message queuing pros and cons: it allows you to partition data processing across multiple consumer instances, which allows you to extend processing. The queue is not multi-subscriber-once a process reads its lost data. Publish subscription pros and cons: Publish-subscribe allows you to broadcast data to multiple processes, but because each message is delivered to each subscriber, processing cannot be extended.

As a messaging system, how is it different from mq? (RabbitMq\ redis\ RocketMq\ ActiveMq)

RabbitMQ: following the AMQP protocol, developed by the inherent high concurrency erlang language, it is used for real-time message delivery that requires high reliability. Ten thousand-level data, the community activity is extremely high, the visual operation interface is rich. It provides comprehensive core functions and is an excellent product of message queuing. Because it is developed in erlang language, it is difficult to maintain and it is difficult for developers to redevelop. Redis: the main scenario of redis is an in-memory database, which is too unreliable as a message queue and too dependent on network IO for speed. The speed on the server is fast, and it is easy to have the problem of data accumulation, so it can be applied in lighter situations. RocketMq: rocketMq hundreds of thousands of levels of data, based on Java development. Is an Alibaba open source message product. To meet the Taobao Singles Day test, and the documentation is very perfect, with some advanced features that other message queues do not have, such as timing push, other message queues are delayed push, such as rabbitMq sets the delay push time by setting the expire field. For example, rocketmq implements distributed transactions, which is more reliable. RocketMq is also the only product that has been used to support distributed transactions. Kafka: kafka was originally designed for log statistical analysis, but now it can also do operational data analysis and statistics based on the background of big data. Kafka is a truly large-scale distributed message queue that provides fewer core functions. Distributed message subscription based on zookeeper. Hundreds of thousands of data levels, stronger than RokectMq. The communication between the client and the server is accomplished through a simple, high-performance, language-independent TCP protocol. ActiveMq: Apache ActiveMQ ™is the most popular open source, multiprotocol, java-based messaging server. It supports industry standard protocols, so users can choose clients in a variety of languages and platforms. You can use from C, C++, Python,. The connectivity of net et al. Integrate your multi-platform applications using the common AMQP protocol. Use STOMP to exchange messages between web applications on websockets. Use MQTT to manage Internet of things devices. Support your existing JMS infrastructure and others. ActiveMQ provides the power and flexibility to support any messagi.

Note: because this article mainly introduces kafka, so the above is only a simple list of some features, if interested students can analyze in detail, these products I will write a special article to summarize and analyze, here first briefly mentioned.

Why use message queuing?

This part is the expansion of content, many people, including the year I just graduated from the use of message queues, but people asked me why I use message queues, I do not have a very clear understanding, so I also talk about it here. I hope to give some help to the students in need.

So why use message queuing? First, let's review the message delivery. As far as the front end is concerned, the traditional way is to pass through global variables, followed by the concept of data bus, and then there are corresponding solution products such as vuex, redux, store and so on. For the back-end, the first communication between systems, message delivery are very dependent on communication objects each other, highly coupled, and then there are some products to solve these problems, such as webservice. But this way is extremely unfriendly, and maintenance is tedious, responsibilities are difficult to distinguish, and the workload increases, so after the birth of mq, basically solved these problems.

Message queuing was introduced to:

1. Decoupling: for example: a system operates p, needs to deliver the message to B, C two systems, if there is no message queue, then A system needs to send a message to B, then have to send a message to C, then one day D, E, F system said: a system you also have to send me p message, at this time A has to modify the code, release online, DEF can receive messages normally. Then after n days, C said, don't send me a message, get rid of the part that sent me the message. The developers of system A have to get rid of it again and release it online. In this way, day after day, with the increase of systems and the increase of access and exit operations, system A needs to be released online frequently, which reduces stability and availability time, and requires testing and tracking testing every time it is launched. The cost and risk are self-evident. Once message queuing is introduced, A does not need to care about who consumes and who exits, while An is only responsible for putting messages into the queue, while other systems only need to monitor this queue. Even if other systems exit, it will not have any impact on A. isn't it fragrant to be able to provide services continuously? two。 Asynchronous for example: the traditional way to send messages to B, C, D, requires 120ms, so if you use message queues, you can greatly reduce the time-consuming. But these apply to non-essential synchronous business logic. 3. In the traditional peak-cutting mode, the request goes directly to the database, and when the peak reaches a certain value, it is bound to die. If the middleware message queue is applied, then the normal service of the system can be ensured, which is the current limit often talked about in the second kill system, which can prevent the system from crashing and provide system availability. Configure MAVEN org.apache.kafka kafka_2.12 1.0.0 provided org.apache.kafka kafka-clients 1.0.0 org.apache.kafka kafka-streams 1.0.0 Health Producer / * * @ author chandlerHuang * @ description @ TODO * @ date 2020-1-15 * / public class KafkaProducerService implements Runnable {private final KafkaProducer producer Private final String topic; public KafkaProducerService (String topic) {Properties props = new Properties (); props.put ("bootstrap.servers", "bound public network IP:9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", StringSerializer.class.getName ()) Props.put ("value.serializer", StringSerializer.class.getName ()); this.producer = new KafkaProducer (props); this.topic = topic;} @ Override public void run () {int messageNo = 1; try {for (;;) {String messageStr= "[" + messageNo+ "]: hello,boys!" Producer.send (new ProducerRecord (topic, "Message", messageStr)); / / print if (messageNo0==0) {System.out.println ("sendMessages:" + messageStr) } / / quit if (messageNo00==0) {System.out.println ("successCount:" + messageNo); break;} messageNo++;}} catch (Exception e) {e.printStackTrace () } finally {producer.close ();}} public static void main (String args []) {KafkaProducerService test = new KafkaProducerService (TopicConstant.CHART_TOPIC); Thread thread = new Thread (test); thread.start ();}}

Consumer / * * @ author chandlerHuang * @ description @ TODO * @ date 2020-1-15 * / public class KafkaConsumerService implements Runnable {private final KafkaConsumer consumer; private ConsumerRecords msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerService (String topicName) {Properties props = new Properties (); props.put ("bootstrap.servers", "bound public IP:9092") Props.put ("group.id", GROUPID); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "earliest"); props.put ("key.deserializer", StringDeserializer.class.getName ()) Props.put ("value.deserializer", StringDeserializer.class.getName ()); this.consumer = new KafkaConsumer (props); this.topic = topicName; this.consumer.subscribe (Arrays.asList (topic));} @ Override public void run () {int messageNo = 1; System.out.println ("- start consumption -") Try {for (;;) {msgList = consumer.poll (1000) If (nullroomroommsgListroommsgList.count () > 0) {for (ConsumerRecord record: msgList) {/ / consume 100itemes to print But the printed data is not necessarily if (messageNo0==0) {System.out.println (messageNo+ "= receive: key =" + record.key () + ", value =" + record.value () + "offset=== + record.offset ()). } / / quit if (messageNo00==0) {break;} messageNo++ when 1000 items are consumed }} else {Thread.sleep (1000);} catch (InterruptedException e) {e.printStackTrace ();} finally {consumer.close () }} public static void main (String args []) {KafkaConsumerService test1 = new KafkaConsumerService (TopicConstant.CHART_TOPIC); Thread thread1 = new Thread (test1); thread1.start ();}}

Note: during the writing of the above demo, an Exception:Kafka java client connection exception (org.apache.kafka.common.errors.TimeoutException: Failed to update metadata) was found.

Server. Kafka needs to be configured. File:

Advertised.listeners=PLAINTEXT:// public network address: 9092 zookeeper.connect = private network address: 2181

If you are a CVM, set the corresponding port in the security group to open, otherwise you cannot access the response interface!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report