In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/02 Report--
1. Kafka usage background
When we use a lot of distributed databases and distributed computing clusters, will we encounter some problems like this:
We want to analyze user behavior (pageviews) so that we can design better advertising space.
I want to make statistics on users' search keywords and analyze the current popular trend.
Some data are wasted to store the database, and the efficiency of storing the hard disk directly is low.
All these scenes have one thing in common:
The data is generated by the upstream module, which uses the data calculation, statistics and analysis of the upstream module. At this time, you can use the message system, especially the distributed message system!
2. The definition of Kafka
What is Kafka: it is a distributed messaging system written by linkedin in scala and used as the basis for LinkedIn's activity flow (Activity Stream) and operational data processing pipeline (Pipeline). It has high level of scale and high throughput.
3. Comparison between Kafka and other mainstream distributed messaging systems.
Definition explanation:
1. Both Java and scala are languages running on JVM.
2. Erlang, like the recently popular go language, supports high concurrency from the code level, so RabbitMQ is born with high concurrency performance, but RabbitMQ is implemented strictly in accordance with AMQP, which is subject to a lot of restrictions. Kafka is designed for high throughput, so kafka designed a high-performance but not universal protocol, which is also modeled on AMQP (Advanced Message Queuing Protocol Advanced message queuing Protocol).
3. The concept of transaction: in a database, multiple operations are committed together, either all succeed or all fail. For example, payment and collection at the time of transfer is an example of a thing. When you transfer money to a person, you transfer money successfully, and after the other person's normal bank receives the money, this operation is considered successful, and one party fails. Then the operation is a failure.
The corresponding elimination in the message queue means that multiple messages are sent together, either succeeding or failing. Of the three, only ActiveMQ is supported, because RabbitMQ and Kafka give up support for transactions for higher performance.
4. Cluster: a whole composed of multiple servers is called a cluster, which is transparent to producers and consumers. In fact, adding a server to a cluster of consumer systems and reducing a server is insensitive to both producers and consumers.
5. Load balancing, for the messaging system, load balancing means that a large number of producers and consumers send request messages to the messaging system. The system must balance these requests so that the requests of each server are balanced, rather than a large number of requests. Fall to one or more servers, making these servers overloaded or overloaded, and in serious cases will stop service or downtime.
6. Dynamic expansion is one of the technologies required by many companies. Not supporting dynamic expansion means stopping services, which is unacceptable to many companies.
Note:
Alibaba's Metal,RocketMQ has the shadow of Kafka. They either modified Kafka or borrowed from Kafka. Finally, the dynamic expansion of Kafka is realized through Zookeeper.
Zookeeper is a cluster widely used in distributed systems as distributed state management, distributed coordination management, distributed configuration management, and distributed locking services. The increase and decrease of kafka server will trigger the corresponding events on the Zookeeper node. The kafka system will capture these events for a new round of load balancing, and the client will also capture these events for a new round of processing.
II. Kafka related concepts 1. AMQP protocol
Advanced Message Queuing Protocol (Advanced message queuing Protocol)
The Advanced Message Queuing Protocol (AMQP): is a standard open application layer message middleware (Message Oriented Middleware) protocol. AMQP defines the data format for byte streams sent over the network. Therefore, the compatibility is very good, any program that implements the AMQP protocol can interact with other programs compatible with the AMQP protocol, and it is easy to achieve cross-language and cross-platform.
The three popular message queuing protocols mentioned above either support AMQP protocol or draw lessons from the idea of AMQP protocol to develop, implement and design.
2. Some basic concepts
1. Consumer: (Consumer): the client application that requests messages from the message queue
2. Producer: (Producer): application that publishes messages to broker
3. AMQP server (broker): it is used to receive messages sent by producers and route them to queues in the server. It is convenient for fafka to dynamically add messages sent by producers to disk and give each message an offset, so a broker for kafka is an example of an application.
Client languages supported by kafka
Kafka client supports most of the current mainstream languages, including: C, C++, Erlang, Java, .net, perl, PHP, Python, Ruby, Go, Javascript
You can use any of the above languages to communicate with kafka servers (that is, to distinguish your own consumer from kafka cluster subscription messages or to write your own producer programs)
3. Kafka architecture
A structure such as producer production message, kafka cluster, and consumer access message, as shown below:
Messages in the kafka cluster are organized by Topic (topics), as shown in the following figure:
Some basic concepts:
1. Topic: a topic is similar to the classified concepts of sports, entertainment, education and so on in news. In practical engineering, it is usually a topic for one business.
2. Partition: the message data in a Topic is organized by multiple partitions. Partition is the smallest unit of kafka message queue organization, and a partition can be regarded as a FIFO (abbreviation of First Input First Output, first-in, first-out queue).
Kafka partition is the key to improve the performance of kafka. When you find that your cluster performance is not high, the common means is to increase the partition of Topic. The messages in the partition are organized in the order from the new to the old. Consumers subscribe to messages from the beginning of the queue and producers add messages from the end of the queue.
Work drawing:
Backup (Replication): to ensure distributed reliability, kafka0.8 starts backing up the data of each partition (on different Broker) to prevent one of the Broker downtime from causing the data on the partition to be unavailable.
Kafka0.7 is a big change: 1, add backup 2, add the concept of control borrowing point, and increase the election of cluster leaders.
Third, Zookeeper (zoo) cluster construction
The kafka cluster saves the state in the Zookeeper. The first step is to set up the Zookeeper cluster.
1. Software environment (3 servers-testing, usually an odd number of servers)
Vim / etc/hosts (all 3 servers need to write)
192.168.11.128 server1
192.168.11.129 server2
192.168.11.130 server3
1. One, three, five, (2*n+1) Linux servers. Zookeeper clusters need more than half of the work to provide services, and more than half of the three servers are allowed to fail. It is not necessary to use even numbers.
If there are four, then there are three servers left if you hang up one, and you can't if you hang up one more. Remember here that it is more than half.
2. Java jdk1.8 zookeeper is written in java, so his need for JAVA environment, java is running on the java virtual machine
3. The stable version of Zookeeper, Zookeeper 3.4.14.
- -
2. Configure and install zookeeper
The following operations are as follows: three servers operate uniformly
1. Install java (I use jdk to install here)
First prepare the jdk package and decompress the tape / usr/local.
Create a soft link
Write java environment variables
Vim / etc/profile.d/aa.sh
Load environment variabl
Source / etc/profile (load all environment variables) or source / etc/profile.d/aa.sh (load this one environment variable)
Check to see if java is installed successfully
As shown in the figure above, the java environment has been successfully deployed
-
Install java with yum
Yum list java* & & yum-y install java
-
2. Download Zookeeper (3 servers operate uniformly)
First of all, we should pay attention to the definition of the directory structure in the production environment to prevent the required projects from being found when there are too many projects.
# the directory is placed under / opt
# first create a Zookeeper project directory
Cd / opt
Mkdir zookeeper / / project directory
Mkdir zookeeper/zkdata / / stores snapshot logs
Mkdir zookeeper/zkdatalog / / stores the log of things
Download Zookeeper
# download the software
Cd / opt/zookeeper/
Wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
Decompression software
Tar xf zookeeper-3.4.14.tar.gz
3. Modify the configuration file
Go to the conf directory in the extracted directory and check
# zoo_sample.cfg this file is the official zookeeper template file, give him a copy named zoo.cfg,zoo.cfg is the official document naming rules.
Configuration files for 3 servers
Vim zoo.cfg
The two lines of dataDir and clientPort defined above should be commented, otherwise the cluster will report an error later.
# server.1 this 1 is the ID of the server or other numbers, indicating which server it is, which is used to identify the server. This ID should be written to the myid file under the snapshot directory.
# 192.168.11.139 is the IP address in the cluster. The first port is the communication port between master and slave. The default is 2888. The second port is the port elected by leader. The default port for the new election is 3888 when the cluster is started or after the leader is hung up.
The configuration file explains:
# tickTime:
This time is used as the interval between Zookeeper servers or between clients and servers to maintain a heartbeat, that is, a heartbeat is sent at each tickTime time.
# initLimit:
This configuration item is used to configure the Zookeeper accept client (the client here is not the client that the user connects to the Zookeeper server, but the Follower server in the Zookeeper server cluster that connects to the Leader) the maximum number of heartbeat intervals that can be tolerated when initializing the connection. When the Zookeeper server has not received a return message from the client after the length of more than 5 heartbeats (that is, tickTime), the client connection failed. The total length of time is 5 "2000" 10 seconds.
# syncLimit:
This configuration item identifies the length of time for sending messages, requests and replies between Leader and Follower, which cannot exceed the maximum number of tickTime. The total time length is 5 "2000" 10 seconds.
# dataDir:
Storage path of snapshot log
# dataLogDir:
The storage path of the transaction log. If this is not configured, the transaction log will be stored in the directory established by dataDir by default, which will seriously affect the performance of zk. When the throughput of zk is large, too many transaction logs and snapshot logs will be generated.
# clientPort:
This port is the port on which the client connects to the Zookeeper server. Zookeeper listens on this port and accepts requests for access from the client. Modify his port to make it bigger.
Create myid files (each one is different)
# server1
Echo "1" > / opt/zookeeper/zkdata/myid
# server2
Echo "2" > / opt/zookeeper/zkdata/myid
# server3
Echo "3" > / opt/zookeeper/zkdata/myid
4. Important configuration instructions
1. The myid file and the file that identifies the server stored by server.myid in the snapshot directory is an important identity used by the whole zk cluster to discover each other.
2. The zoo.cfg file is the zookeeper configuration file in the conf directory.
3. The log4j.properties file is the log output file of zk. The program written with java in the conf directory basically has one thing in common. All logs are managed by log4j.
4. ZkEnv.sh and zkServer.sh files
ZkServer.sh master hypervisor file
ZkEnv.sh is the main configuration, and the file that configures the environment variables when the zookeeper cluster starts
5. There is one more thing to pay attention to
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
Zookeeper does not actively erase old snapshots and log files, which is the responsibility of the operator.
But you can clean it up regularly by order.
The above script defines deleting the files in the corresponding two directories and keeping the latest 66 files. You can write them to crontab and set them to be executed once every day at 2: 00 a.m.
5. Start the service and view
1. Start the service
# go to the bin directory of Zookeeper
Cd / opt/zookeeper/zookeeper-3.4.14/bin
# start the service (all 3 require operation)
2. Check the service status
# check server status (there will be one leader and two follower)
Generally speaking, a zk cluster has only one leader and multiple follower, and the master is generally the read and write request of the corresponding client. When the master is synchronized with the data, a leader is elected from the follower when the master is hung up.
You can use jps to view the progress of zk, which is the main of the whole project.
# execute the command jps
Fourth, build kafka cluster
1. Software environment
1. One or more linux, greater than or equal to 2
2. The zookeeper cluster that has been built
3. Software version kafka_2.11-0.9.0.1.tgz
2. Create a directory and download the installation software (3 servers)
# create a directory
Cd / opt
Mkdir kafka # create a project directory
Cd kafka
Mkdir kafkalogs # creates a directory of kafka messages, which mainly stores kafka messages
# download the software
Wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
# decompression software
Tar-zxvf kafka_2.11-2.2.0.tgz
3. Modify the configuration file
Go to the config directory
Cd / opt/kafka/kafka_2.11-2.2.0/config/
Main concern: server.properties this file can be found in the directory:
There are many files. You can find Zookeeper files here. We can start according to the zk cluster in Kafka, but it is recommended to use a separate zk cluster.
Modify the configuration file
Broker.id=0 # the unique ID of the current machine in the cluster, which is the same as the myid nature of zookeeper
Port=19092 # the default port for kafka to provide services is 9092.
The parameter host.name=192.168.7.100 # is off by default, and there is a problem of bug,DNS parsing and failure rate at 0.8.1.
Num.network.threads=3 # this is the number of threads used by borker for network processing
Num.io.threads=8 # this is the number of threads processed by borker for IBG O
The directory where log.dirs=/opt/kafka/kafkalogs/ # messages are stored. This directory can be configured as a comma-separated expression. The num.io.threads above should be greater than the number of this directory. If multiple directories are configured, the newly created topic persists messages in at least one of the current comma-separated directories.
Socket.send.buffer.bytes=102400 # send buffer buffer size. Data is not sent all at once. The data is first stored in the buffer and then sent after reaching a certain size, which can improve performance.
Socket.receive.buffer.bytes=102400 # kafka receive buffer size, serialize to disk when the data reaches a certain size
Socket.request.max.bytes=104857600 # this parameter is the maximum number of requests that request messages to kafka or send messages to kafka. This value cannot exceed the stack size of java.
Num.partitions=1 # default number of partitions. Default number of partitions for a topic
Maximum persistence time of log.retention.hours=168 # default messages: 168hrs, 7days
Maximum saved value of message.max.byte=5242880 # messages is 5m
Default.replication.factor=2 # kafka saves the number of copies of the message. If one copy is invalid, the other can continue to provide services.
Replica.fetch.max.bytes=5242880 # fetch the maximum number of messages directly
The parameter log.segment.bytes=1073741824 # is: because the kafka message is appended to the file, when this value is exceeded, kafka will create a new file
Log.retention.check.interval.ms=300000 # check the log expiration time (log.retention.hours=168) configured above every 300000 milliseconds and go to the directory to see if there are any expired messages. If so, delete them.
Log.cleaner.enable=false # generally does not need to enable log compression. If enabled, it can improve performance.
Zookeeper.connect=192.168.11.139:12181192.168.11.140:12181192.168.11.141:1218 # set the connection port of zookeeper
The above is the explanation of the parameters, and the actual modifications are:
End of profile modification
4. Start the kafka cluster and test
Start the service
two。 Check whether the service is started
# execute the command jps
3. Create a Topic to verify whether the creation is successful
For more information, please see the official document: http://kafka.apache.org/documentation.html
# create Topic (topic)
# explain
-- replication-factor 2 # make two copies
-- partitions 1 # create 1 partition
-- topic # theme is meinv
'' create a publisher on a server''
# create a broker, publisher
'' create a subscriber on a server''
At this point, the service construction is over.
5. Other explanatory notes
5.1, log description
Default kafka logs are saved in the / opt/kafka/kafka_2.11-2.2.0/logs directory. Here are a few logs that need to be paid attention to.
Running log of server.log # kafka
State-change.log # kafka he uses zookeeper to save the state, so he may switch, and the log of the switch is saved here.
Controller.log # kafka chooses a node as the "controller". When it is found that a node down is missing, it is responsible for selecting a new leader among all nodes in the swimming zone, which enables Kafka to manage the master-slave relationship of all partition nodes in batches and efficiently. If controller down is dropped, one of the living nodes will switch to the new controller.5.2. When you are finished, you can log in to zk to check the directory of zk.
# use the client to enter zk
Cd/opt/zookeeper/zookeeper-3.4.14/bin
. / zkCli.sh-server 192.168.11.139server' 12181 # default is not to add the'- port parameter because we have modified its port
# check the directory and execute "ls /"
[zk: 127.0.0.1 ls 12181 (CONNECTED) 0]
# display result: [consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
''
In the above display, only zookeeper is native to zookeeper, and the rest is created by Kafka.
''
# Mark an important
[zk: 127.0.0.1 get (CONNECTED) 1] get / brokers/ids/1
{"jmx_port":-1, "timestamp": "1456125963355", "endpoints": ["PLAINTEXT://192.168.7.100:19092"], "host": "192.168.7.100", "version": 2, "port": 19092}
CZxid = 0x1000001c1
Ctime = Mon Feb 22 15:26:03 CST 2016
MZxid = 0x1000001c1
Mtime = Mon Feb 22 15:26:03 CST 2016
PZxid = 0x1000001c1
Cversion = 0
DataVersion = 0
AclVersion = 0
EphemeralOwner = 0x152e40aead20016
DataLength = 139
NumChildren = 0
[zk: 127.0.0.1 12181 (CONNECTED) 2]
# another is to check partion
[zk: 127.0.0.1 get 12181 (CONNECTED) 7] get / brokers/topics/shuaige/partitions/0
Null
CZxid = 0x100000029
Ctime = Mon Feb 22 10:05:11 CST 2016
MZxid = 0x100000029
Mtime = Mon Feb 22 10:05:11 CST 2016
PZxid = 0x10000002a
Cversion = 1
DataVersion = 0
AclVersion = 0
EphemeralOwner = 0x0
DataLength = 0
NumChildren = 1
[zk: 127.0.0.1 12181 (CONNECTED) 8]
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.