Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Kafka cluster building (message)

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/02 Report--

1. Kafka usage background

When we use a lot of distributed databases and distributed computing clusters, will we encounter some problems like this:

We want to analyze user behavior (pageviews) so that we can design better advertising space.

I want to make statistics on users' search keywords and analyze the current popular trend.

Some data are wasted to store the database, and the efficiency of storing the hard disk directly is low.

All these scenes have one thing in common:

The data is generated by the upstream module, which uses the data calculation, statistics and analysis of the upstream module. At this time, you can use the message system, especially the distributed message system!

2. The definition of Kafka

What is Kafka: it is a distributed messaging system written by linkedin in scala and used as the basis for LinkedIn's activity flow (Activity Stream) and operational data processing pipeline (Pipeline). It has high level of scale and high throughput.

3. Comparison between Kafka and other mainstream distributed messaging systems.

Definition explanation:

1. Both Java and scala are languages running on JVM.

2. Erlang, like the recently popular go language, supports high concurrency from the code level, so RabbitMQ is born with high concurrency performance, but RabbitMQ is implemented strictly in accordance with AMQP, which is subject to a lot of restrictions. Kafka is designed for high throughput, so kafka designed a high-performance but not universal protocol, which is also modeled on AMQP (Advanced Message Queuing Protocol Advanced message queuing Protocol).

3. The concept of transaction: in a database, multiple operations are committed together, either all succeed or all fail. For example, payment and collection at the time of transfer is an example of a thing. When you transfer money to a person, you transfer money successfully, and after the other person's normal bank receives the money, this operation is considered successful, and one party fails. Then the operation is a failure.

The corresponding elimination in the message queue means that multiple messages are sent together, either succeeding or failing. Of the three, only ActiveMQ is supported, because RabbitMQ and Kafka give up support for transactions for higher performance.

4. Cluster: a whole composed of multiple servers is called a cluster, which is transparent to producers and consumers. In fact, adding a server to a cluster of consumer systems and reducing a server is insensitive to both producers and consumers.

5. Load balancing, for the messaging system, load balancing means that a large number of producers and consumers send request messages to the messaging system. The system must balance these requests so that the requests of each server are balanced, rather than a large number of requests. Fall to one or more servers, making these servers overloaded or overloaded, and in serious cases will stop service or downtime.

6. Dynamic expansion is one of the technologies required by many companies. Not supporting dynamic expansion means stopping services, which is unacceptable to many companies.

Note:

Alibaba's Metal,RocketMQ has the shadow of Kafka. They either modified Kafka or borrowed from Kafka. Finally, the dynamic expansion of Kafka is realized through Zookeeper.

Zookeeper is a cluster widely used in distributed systems as distributed state management, distributed coordination management, distributed configuration management, and distributed locking services. The increase and decrease of kafka server will trigger the corresponding events on the Zookeeper node. The kafka system will capture these events for a new round of load balancing, and the client will also capture these events for a new round of processing.

II. Kafka related concepts 1. AMQP protocol

Advanced Message Queuing Protocol (Advanced message queuing Protocol)

The Advanced Message Queuing Protocol (AMQP): is a standard open application layer message middleware (Message Oriented Middleware) protocol. AMQP defines the data format for byte streams sent over the network. Therefore, the compatibility is very good, any program that implements the AMQP protocol can interact with other programs compatible with the AMQP protocol, and it is easy to achieve cross-language and cross-platform.

The three popular message queuing protocols mentioned above either support AMQP protocol or draw lessons from the idea of AMQP protocol to develop, implement and design.

2. Some basic concepts

1. Consumer: (Consumer): the client application that requests messages from the message queue

2. Producer: (Producer): application that publishes messages to broker

3. AMQP server (broker): it is used to receive messages sent by producers and route them to queues in the server. It is convenient for fafka to dynamically add messages sent by producers to disk and give each message an offset, so a broker for kafka is an example of an application.

Client languages supported by kafka

Kafka client supports most of the current mainstream languages, including: C, C++, Erlang, Java, .net, perl, PHP, Python, Ruby, Go, Javascript

You can use any of the above languages to communicate with kafka servers (that is, to distinguish your own consumer from kafka cluster subscription messages or to write your own producer programs)

3. Kafka architecture

A structure such as producer production message, kafka cluster, and consumer access message, as shown below:

Messages in the kafka cluster are organized by Topic (topics), as shown in the following figure:

Some basic concepts:

1. Topic: a topic is similar to the classified concepts of sports, entertainment, education and so on in news. In practical engineering, it is usually a topic for one business.

2. Partition: the message data in a Topic is organized by multiple partitions. Partition is the smallest unit of kafka message queue organization, and a partition can be regarded as a FIFO (abbreviation of First Input First Output, first-in, first-out queue).

Kafka partition is the key to improve the performance of kafka. When you find that your cluster performance is not high, the common means is to increase the partition of Topic. The messages in the partition are organized in the order from the new to the old. Consumers subscribe to messages from the beginning of the queue and producers add messages from the end of the queue.

Work drawing:

Backup (Replication): to ensure distributed reliability, kafka0.8 starts backing up the data of each partition (on different Broker) to prevent one of the Broker downtime from causing the data on the partition to be unavailable.

Kafka0.7 is a big change: 1, add backup 2, add the concept of control borrowing point, and increase the election of cluster leaders.

Third, Zookeeper (zoo) cluster construction

The kafka cluster saves the state in the Zookeeper. The first step is to set up the Zookeeper cluster.

1. Software environment (3 servers-testing, usually an odd number of servers)

Vim / etc/hosts (all 3 servers need to write)

192.168.11.128 server1

192.168.11.129 server2

192.168.11.130 server3

1. One, three, five, (2*n+1) Linux servers. Zookeeper clusters need more than half of the work to provide services, and more than half of the three servers are allowed to fail. It is not necessary to use even numbers.

If there are four, then there are three servers left if you hang up one, and you can't if you hang up one more. Remember here that it is more than half.

2. Java jdk1.8 zookeeper is written in java, so his need for JAVA environment, java is running on the java virtual machine

3. The stable version of Zookeeper, Zookeeper 3.4.14.

- -

2. Configure and install zookeeper

The following operations are as follows: three servers operate uniformly

1. Install java (I use jdk to install here)

First prepare the jdk package and decompress the tape / usr/local.

Create a soft link

Write java environment variables

Vim / etc/profile.d/aa.sh

Load environment variabl

Source / etc/profile (load all environment variables) or source / etc/profile.d/aa.sh (load this one environment variable)

Check to see if java is installed successfully

As shown in the figure above, the java environment has been successfully deployed

-

Install java with yum

Yum list java* & & yum-y install java

-

2. Download Zookeeper (3 servers operate uniformly)

First of all, we should pay attention to the definition of the directory structure in the production environment to prevent the required projects from being found when there are too many projects.

# the directory is placed under / opt

# first create a Zookeeper project directory

Cd / opt

Mkdir zookeeper / / project directory

Mkdir zookeeper/zkdata / / stores snapshot logs

Mkdir zookeeper/zkdatalog / / stores the log of things

Download Zookeeper

# download the software

Cd / opt/zookeeper/

Wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

Decompression software

Tar xf zookeeper-3.4.14.tar.gz

3. Modify the configuration file

Go to the conf directory in the extracted directory and check

# zoo_sample.cfg this file is the official zookeeper template file, give him a copy named zoo.cfg,zoo.cfg is the official document naming rules.

Configuration files for 3 servers

Vim zoo.cfg

The two lines of dataDir and clientPort defined above should be commented, otherwise the cluster will report an error later.

# server.1 this 1 is the ID of the server or other numbers, indicating which server it is, which is used to identify the server. This ID should be written to the myid file under the snapshot directory.

# 192.168.11.139 is the IP address in the cluster. The first port is the communication port between master and slave. The default is 2888. The second port is the port elected by leader. The default port for the new election is 3888 when the cluster is started or after the leader is hung up.

The configuration file explains:

# tickTime:

This time is used as the interval between Zookeeper servers or between clients and servers to maintain a heartbeat, that is, a heartbeat is sent at each tickTime time.

# initLimit:

This configuration item is used to configure the Zookeeper accept client (the client here is not the client that the user connects to the Zookeeper server, but the Follower server in the Zookeeper server cluster that connects to the Leader) the maximum number of heartbeat intervals that can be tolerated when initializing the connection. When the Zookeeper server has not received a return message from the client after the length of more than 5 heartbeats (that is, tickTime), the client connection failed. The total length of time is 5 "2000" 10 seconds.

# syncLimit:

This configuration item identifies the length of time for sending messages, requests and replies between Leader and Follower, which cannot exceed the maximum number of tickTime. The total time length is 5 "2000" 10 seconds.

# dataDir:

Storage path of snapshot log

# dataLogDir:

The storage path of the transaction log. If this is not configured, the transaction log will be stored in the directory established by dataDir by default, which will seriously affect the performance of zk. When the throughput of zk is large, too many transaction logs and snapshot logs will be generated.

# clientPort:

This port is the port on which the client connects to the Zookeeper server. Zookeeper listens on this port and accepts requests for access from the client. Modify his port to make it bigger.

Create myid files (each one is different)

# server1

Echo "1" > / opt/zookeeper/zkdata/myid

# server2

Echo "2" > / opt/zookeeper/zkdata/myid

# server3

Echo "3" > / opt/zookeeper/zkdata/myid

4. Important configuration instructions

1. The myid file and the file that identifies the server stored by server.myid in the snapshot directory is an important identity used by the whole zk cluster to discover each other.

2. The zoo.cfg file is the zookeeper configuration file in the conf directory.

3. The log4j.properties file is the log output file of zk. The program written with java in the conf directory basically has one thing in common. All logs are managed by log4j.

4. ZkEnv.sh and zkServer.sh files

ZkServer.sh master hypervisor file

ZkEnv.sh is the main configuration, and the file that configures the environment variables when the zookeeper cluster starts

5. There is one more thing to pay attention to

ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator

Zookeeper does not actively erase old snapshots and log files, which is the responsibility of the operator.

But you can clean it up regularly by order.

The above script defines deleting the files in the corresponding two directories and keeping the latest 66 files. You can write them to crontab and set them to be executed once every day at 2: 00 a.m.

5. Start the service and view

1. Start the service

# go to the bin directory of Zookeeper

Cd / opt/zookeeper/zookeeper-3.4.14/bin

# start the service (all 3 require operation)

2. Check the service status

# check server status (there will be one leader and two follower)

Generally speaking, a zk cluster has only one leader and multiple follower, and the master is generally the read and write request of the corresponding client. When the master is synchronized with the data, a leader is elected from the follower when the master is hung up.

You can use jps to view the progress of zk, which is the main of the whole project.

# execute the command jps

Fourth, build kafka cluster

1. Software environment

1. One or more linux, greater than or equal to 2

2. The zookeeper cluster that has been built

3. Software version kafka_2.11-0.9.0.1.tgz

2. Create a directory and download the installation software (3 servers)

# create a directory

Cd / opt

Mkdir kafka # create a project directory

Cd kafka

Mkdir kafkalogs # creates a directory of kafka messages, which mainly stores kafka messages

# download the software

Wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

# decompression software

Tar-zxvf kafka_2.11-2.2.0.tgz

3. Modify the configuration file

Go to the config directory

Cd / opt/kafka/kafka_2.11-2.2.0/config/

Main concern: server.properties this file can be found in the directory:

There are many files. You can find Zookeeper files here. We can start according to the zk cluster in Kafka, but it is recommended to use a separate zk cluster.

Modify the configuration file

Broker.id=0 # the unique ID of the current machine in the cluster, which is the same as the myid nature of zookeeper

Port=19092 # the default port for kafka to provide services is 9092.

The parameter host.name=192.168.7.100 # is off by default, and there is a problem of bug,DNS parsing and failure rate at 0.8.1.

Num.network.threads=3 # this is the number of threads used by borker for network processing

Num.io.threads=8 # this is the number of threads processed by borker for IBG O

The directory where log.dirs=/opt/kafka/kafkalogs/ # messages are stored. This directory can be configured as a comma-separated expression. The num.io.threads above should be greater than the number of this directory. If multiple directories are configured, the newly created topic persists messages in at least one of the current comma-separated directories.

Socket.send.buffer.bytes=102400 # send buffer buffer size. Data is not sent all at once. The data is first stored in the buffer and then sent after reaching a certain size, which can improve performance.

Socket.receive.buffer.bytes=102400 # kafka receive buffer size, serialize to disk when the data reaches a certain size

Socket.request.max.bytes=104857600 # this parameter is the maximum number of requests that request messages to kafka or send messages to kafka. This value cannot exceed the stack size of java.

Num.partitions=1 # default number of partitions. Default number of partitions for a topic

Maximum persistence time of log.retention.hours=168 # default messages: 168hrs, 7days

Maximum saved value of message.max.byte=5242880 # messages is 5m

Default.replication.factor=2 # kafka saves the number of copies of the message. If one copy is invalid, the other can continue to provide services.

Replica.fetch.max.bytes=5242880 # fetch the maximum number of messages directly

The parameter log.segment.bytes=1073741824 # is: because the kafka message is appended to the file, when this value is exceeded, kafka will create a new file

Log.retention.check.interval.ms=300000 # check the log expiration time (log.retention.hours=168) configured above every 300000 milliseconds and go to the directory to see if there are any expired messages. If so, delete them.

Log.cleaner.enable=false # generally does not need to enable log compression. If enabled, it can improve performance.

Zookeeper.connect=192.168.11.139:12181192.168.11.140:12181192.168.11.141:1218 # set the connection port of zookeeper

The above is the explanation of the parameters, and the actual modifications are:

End of profile modification

4. Start the kafka cluster and test

Start the service

two。 Check whether the service is started

# execute the command jps

3. Create a Topic to verify whether the creation is successful

For more information, please see the official document: http://kafka.apache.org/documentation.html

# create Topic (topic)

# explain

-- replication-factor 2 # make two copies

-- partitions 1 # create 1 partition

-- topic # theme is meinv

'' create a publisher on a server''

# create a broker, publisher

'' create a subscriber on a server''

At this point, the service construction is over.

5. Other explanatory notes

5.1, log description

Default kafka logs are saved in the / opt/kafka/kafka_2.11-2.2.0/logs directory. Here are a few logs that need to be paid attention to.

Running log of server.log # kafka

State-change.log # kafka he uses zookeeper to save the state, so he may switch, and the log of the switch is saved here.

Controller.log # kafka chooses a node as the "controller". When it is found that a node down is missing, it is responsible for selecting a new leader among all nodes in the swimming zone, which enables Kafka to manage the master-slave relationship of all partition nodes in batches and efficiently. If controller down is dropped, one of the living nodes will switch to the new controller.5.2. When you are finished, you can log in to zk to check the directory of zk.

# use the client to enter zk

Cd/opt/zookeeper/zookeeper-3.4.14/bin

. / zkCli.sh-server 192.168.11.139server' 12181 # default is not to add the'- port parameter because we have modified its port

# check the directory and execute "ls /"

[zk: 127.0.0.1 ls 12181 (CONNECTED) 0]

# display result: [consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]

''

In the above display, only zookeeper is native to zookeeper, and the rest is created by Kafka.

''

# Mark an important

[zk: 127.0.0.1 get (CONNECTED) 1] get / brokers/ids/1

{"jmx_port":-1, "timestamp": "1456125963355", "endpoints": ["PLAINTEXT://192.168.7.100:19092"], "host": "192.168.7.100", "version": 2, "port": 19092}

CZxid = 0x1000001c1

Ctime = Mon Feb 22 15:26:03 CST 2016

MZxid = 0x1000001c1

Mtime = Mon Feb 22 15:26:03 CST 2016

PZxid = 0x1000001c1

Cversion = 0

DataVersion = 0

AclVersion = 0

EphemeralOwner = 0x152e40aead20016

DataLength = 139

NumChildren = 0

[zk: 127.0.0.1 12181 (CONNECTED) 2]

# another is to check partion

[zk: 127.0.0.1 get 12181 (CONNECTED) 7] get / brokers/topics/shuaige/partitions/0

Null

CZxid = 0x100000029

Ctime = Mon Feb 22 10:05:11 CST 2016

MZxid = 0x100000029

Mtime = Mon Feb 22 10:05:11 CST 2016

PZxid = 0x10000002a

Cversion = 1

DataVersion = 0

AclVersion = 0

EphemeralOwner = 0x0

DataLength = 0

NumChildren = 1

[zk: 127.0.0.1 12181 (CONNECTED) 8]

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report