Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

To configure properties for kafka

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "the steps of kafka configuration properties". In the daily operation, I believe many people have doubts about the steps of configuring properties of kafka. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "steps of configuring properties of kafka". Next, please follow the editor to study!

# # System # #

# the ID uniquely identified in the cluster is required to be a positive number.

Broker.id=0

# Service port. Default is 9092.

Port=9092

# listening address, not set to all addresses

Host.name=debugo01

# maximum number of threads to process network requests

Num.network.threads=2

# number of threads processing disk IBO

Num.io.threads=8

# number of backstage threads

Background.threads = 4

# maximum number of request queues waiting to be processed by IO threads

Queued.max.requests = 500,

# send buffer (SO_SNDBUF) of socket

Socket.send.buffer.bytes=1048576

# receive buffer (SO_RCVBUF) of socket

Socket.receive.buffer.bytes=1048576

# maximum number of bytes requested by socket. To prevent memory overflow, the message.max.bytes must be less than

Socket.request.max.bytes = 104857600

# # Topic # #

# the number of partitions per topic, more partition will result in more segment file

Num.partitions=2

# whether to allow automatic creation of topic? for false, you need to create topic by command

Auto.create.topics.enable = true

# A topic. The number of replication in the default partition cannot be greater than the number of broker in the cluster.

Default.replication.factor = 1

# the maximum size of the message body in bytes

Message.max.bytes = 1000000

# # ZooKeeper # #

# Zookeeper quorum settings. If more than one is separated by a comma

Zookeeper.connect=debugo01:2181,debugo02,debugo03

# timeout for connecting to zk

Zookeeper.connection.timeout.ms=1000000

# actual synchronization between leader and follower in a ZooKeeper cluster

Zookeeper.sync.time.ms = 2000

# # Log # #

# logs are stored in directories. Multiple directories are separated by commas

Log.dirs=/var/log/kafka

# when the number of messages below is reached, the data will be flush to the log file. Default 10000

# log.flush.interval.messages=10000

# perform a forced flush operation when the following time (ms) is reached. No matter which interval.ms or interval.messages achieves, it will flush. Default 3000ms

# log.flush.interval.ms=1000

# check whether the interval between log flush

Log.flush.scheduler.interval.ms = 3000

# Log cleanup policy (delete | compact)

Log.cleanup.policy = delete

# Log retention time (hours | minutes). Default is 7 days (168h). After this time, the data will be processed according to the policy. Either bytes or minutes will be triggered no matter which one is reached first.

Log.retention.hours=168

# the maximum number of bytes stored in log data. After this time, the data will be processed according to the policy.

# log.retention.bytes=1073741824

# Control the size of the log segment file. If you exceed this size, append it to a new log segment file (- 1 means there is no limit)

Log.segment.bytes=536870912

# A new segment will be forced when the following time is reached

Log.roll.hours = 2407

# check cycle of log fragment files to see if they meet the deletion policy settings (log.retention.hours or log.retention.bytes)

Log.retention.check.interval.ms=60000

# whether to enable compression

Log.cleaner.enable=false

# maximum retention time for compressed logs

Log.cleaner.delete.retention.ms = 1 day

# Index file size limit for segment logs

Log.index.size.max.bytes = 10 * 1024 * 1024

# y A buffer for index calculation, which generally does not need to be set.

Log.index.interval.bytes = 4096

# # replica # #

# timeout of communication between partition management controller and replicas

Controller.socket.timeout.ms = 30000

# size of controller-to-broker-channels message queue

Controller.message.queue.size=10

# the maximum waiting time for replicas to respond to leader. If this time is exceeded, replicas is excluded from management.

Replica.lag.time.max.ms = 10000

# whether the controller is allowed to disable broker. If set to true, all leader on this broker will be closed and transferred to other broker.

Controlled.shutdown.enable = false

# number of attempts to shut down the controller

Controlled.shutdown.max.retries = 3

# interval between each shutdown attempt

Controlled.shutdown.retry.backoff.ms = 5000

# if the relicas lags too far behind, the partition relicas will be considered invalid. In general, the synchronization of messages in replicas always lags behind because of network delay and other reasons. If the message is seriously lagging, leader will consider the relicas network to have a large latency or limited message throughput. In environments where the number of broker is small, or the network is insufficient, it is recommended to increase this value.

Replica.lag.max.messages = 4000

# socket timeout for leader and relicas

Replica.socket.timeout.ms= 30 * 1000

# socket cache size of leader replication

Replica.socket.receive.buffer.bytes=64 * 1024

# maximum number of bytes that replicas acquires data at a time

Replica.fetch.max.bytes = 1024 * 1024

# maximum waiting time for communication between replicas and leader. If it fails, it will try again.

Replica.fetch.wait.max.ms = 500,

# the minimum data size for each fetch operation. If the data in leader that has not been synchronized is insufficient, it will wait until the data reaches this size.

Replica.fetch.min.bytes = 1

# the number of threads replicated in leader. Increasing this number increases the IO of relipca.

Num.replica.fetchers = 1

# interval between flush of the highest water level for each replica

Replica.high.watermark.checkpoint.interval.ms = 5000

# whether to automatically balance the allocation policy between broker

Auto.leader.rebalance.enable = false

# the imbalance ratio of leader. If it exceeds this value, the partition will be rebalanced.

Leader.imbalance.per.broker.percentage = 10

# time interval to check whether leader is unbalanced

Leader.imbalance.check.interval.seconds = 300

# maximum amount of space that the client retains offset information

Offset.metadata.max.bytes = 1024

# # Consumer # #

# the configuration of the Consumer core is group.id and zookeeper.connect

# the only group ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group that determines the ownership of the Consumer.

Group.id

# the consumer's ID, if not set, will increase itself

Consumer.id

# An ID for tracking and investigation, preferably the same as group.id

Client.id =

# for the designation of zookeeper clusters, you must use the same zk configuration as broker

Zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182

# zookeeper's heartbeat timeout. Consumers who have checked this time are considered invalid.

Zookeeper.session.timeout.ms = 6000

# waiting time for zookeeper to connect

Zookeeper.connection.timeout.ms = 6000

# synchronization time between follower of zookeeper and leader

Zookeeper.sync.time.ms = 2000

# what to do when there is no initial offset in zookeeper, or when the upper limit of offset is exceeded.

# smallest: reset to minimum

# largest: reset to maximum

# anything else: throw an exception to consumer

Auto.offset.reset = largest

# the timeout of socket. The actual timeout is max.fetch.wait + socket.timeout.ms.

Socket.timeout.ms= 30 * 1000

# receive cache space size of socket

Socket.receive.buffer.bytes=64 * 1024

# message size limit for fetch from each partition

Fetch.message.max.bytes = 1024 * 1024

In # true, Consumer synchronizes offset to zookeeper after consuming messages, so that when Consumer fails, the new consumer can get the latest offset from zookeeper

Auto.commit.enable = true

# time interval for automatic submission

Auto.commit.interval.ms = 60 * 1000

# the maximum number of message blocks used for consumption, each of which can be equal to the value in fetch.message.max.bytes

Queued.max.message.chunks = 10

# when a new consumer is added to the group, reblance will be tried and the consumer side of the partitions will be migrated to the new consumer. This setting is the number of attempts.

Rebalance.max.retries = 4

# interval of each reblance

Rebalance.backoff.ms = 2000

# time of each re-election of leader

Refresh.leader.backoff.ms

The minimum data sent by # server to the consumer end. If this value is not met, it will wait until the specified size is met. The default is 1 to receive immediately.

Fetch.min.bytes = 1

# the maximum waiting time for a consumer request if the fetch.min.bytes is not met

Fetch.wait.max.ms = 100

# if no new messages are available for consumption within a specified period of time, an exception is thrown. The default of-1 means that there is no restriction.

Consumer.timeout.ms =-1

# # Producer##

# Core configuration includes:

# metadata.broker.list

# request.required.acks

# producer.type

# serializer.class

# Consumer gets the address of message meta-information (topics, partitions and replicas). The configuration format is: host1:port1,host2:port2. You can also set a vip outside.

Metadata.broker.list

# confirmation mode of message

# 0: do not guarantee the arrival confirmation of the message, just send it with low latency, but the message will be lost. In the case of a server failure, it is a bit like TCP.

# 1: send a message and wait for leader to receive confirmation for a certain degree of reliability

#-1: send a message, wait for leader to receive confirmation, and copy operation before returning, the highest reliability

Request.required.acks = 0

# maximum waiting time for messages to be sent

Request.timeout.ms = 10000

# cache size of socket

Send.buffer.bytes=100*1024

# serialization method of key. If not set, it is the same as serializer.class.

Key.serializer.class

# Partition policy. The default is to take the model.

Partitioner.class=kafka.producer.DefaultPartitioner

# message compression mode. Default is none. You can have gzip and snappy.

Compression.codec = none

# can be compressed for a specific topic of dictation

Compressed.topics=null

# number of retries after failed message delivery

Message.send.max.retries = 3

# interval after each failure

Retry.backoff.ms = 100

# interval for producers to update topic meta information regularly. If set to 0, the data will be updated after each message is sent.

Topic.metadata.refresh.interval.ms = 600 * 1000

# users specify it at will, but it cannot be repeated. It is mainly used to track and record messages.

Client.id= ""

# maximum time to buffer data in asynchronous mode. For example, if it is set to 100, messages in the 100ms will be collected and sent, which will increase the throughput, but will increase the delay of sending messages.

Queue.buffering.max.ms = 5000

# maximum number of messages buffered in asynchronous mode, as above

Queue.buffering.max.messages = 10000

# waiting time for messages to enter the queue in asynchronous mode. If set to 0, the message will not wait. If it cannot enter the queue, it will be discarded directly.

Queue.enqueue.timeout.ms =-1

# in asynchronous mode, the number of messages sent each time. When queue.buffering.max.messages or queue.buffering.max.ms satisfies the condition, producer will trigger sending.

Batch.num.messages=200

At this point, the study on the "steps to configure properties of kafka" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report