In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "the steps of kafka configuration properties". In the daily operation, I believe many people have doubts about the steps of configuring properties of kafka. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "steps of configuring properties of kafka". Next, please follow the editor to study!
# # System # #
# the ID uniquely identified in the cluster is required to be a positive number.
Broker.id=0
# Service port. Default is 9092.
Port=9092
# listening address, not set to all addresses
Host.name=debugo01
# maximum number of threads to process network requests
Num.network.threads=2
# number of threads processing disk IBO
Num.io.threads=8
# number of backstage threads
Background.threads = 4
# maximum number of request queues waiting to be processed by IO threads
Queued.max.requests = 500,
# send buffer (SO_SNDBUF) of socket
Socket.send.buffer.bytes=1048576
# receive buffer (SO_RCVBUF) of socket
Socket.receive.buffer.bytes=1048576
# maximum number of bytes requested by socket. To prevent memory overflow, the message.max.bytes must be less than
Socket.request.max.bytes = 104857600
# # Topic # #
# the number of partitions per topic, more partition will result in more segment file
Num.partitions=2
# whether to allow automatic creation of topic? for false, you need to create topic by command
Auto.create.topics.enable = true
# A topic. The number of replication in the default partition cannot be greater than the number of broker in the cluster.
Default.replication.factor = 1
# the maximum size of the message body in bytes
Message.max.bytes = 1000000
# # ZooKeeper # #
# Zookeeper quorum settings. If more than one is separated by a comma
Zookeeper.connect=debugo01:2181,debugo02,debugo03
# timeout for connecting to zk
Zookeeper.connection.timeout.ms=1000000
# actual synchronization between leader and follower in a ZooKeeper cluster
Zookeeper.sync.time.ms = 2000
# # Log # #
# logs are stored in directories. Multiple directories are separated by commas
Log.dirs=/var/log/kafka
# when the number of messages below is reached, the data will be flush to the log file. Default 10000
# log.flush.interval.messages=10000
# perform a forced flush operation when the following time (ms) is reached. No matter which interval.ms or interval.messages achieves, it will flush. Default 3000ms
# log.flush.interval.ms=1000
# check whether the interval between log flush
Log.flush.scheduler.interval.ms = 3000
# Log cleanup policy (delete | compact)
Log.cleanup.policy = delete
# Log retention time (hours | minutes). Default is 7 days (168h). After this time, the data will be processed according to the policy. Either bytes or minutes will be triggered no matter which one is reached first.
Log.retention.hours=168
# the maximum number of bytes stored in log data. After this time, the data will be processed according to the policy.
# log.retention.bytes=1073741824
# Control the size of the log segment file. If you exceed this size, append it to a new log segment file (- 1 means there is no limit)
Log.segment.bytes=536870912
# A new segment will be forced when the following time is reached
Log.roll.hours = 2407
# check cycle of log fragment files to see if they meet the deletion policy settings (log.retention.hours or log.retention.bytes)
Log.retention.check.interval.ms=60000
# whether to enable compression
Log.cleaner.enable=false
# maximum retention time for compressed logs
Log.cleaner.delete.retention.ms = 1 day
# Index file size limit for segment logs
Log.index.size.max.bytes = 10 * 1024 * 1024
# y A buffer for index calculation, which generally does not need to be set.
Log.index.interval.bytes = 4096
# # replica # #
# timeout of communication between partition management controller and replicas
Controller.socket.timeout.ms = 30000
# size of controller-to-broker-channels message queue
Controller.message.queue.size=10
# the maximum waiting time for replicas to respond to leader. If this time is exceeded, replicas is excluded from management.
Replica.lag.time.max.ms = 10000
# whether the controller is allowed to disable broker. If set to true, all leader on this broker will be closed and transferred to other broker.
Controlled.shutdown.enable = false
# number of attempts to shut down the controller
Controlled.shutdown.max.retries = 3
# interval between each shutdown attempt
Controlled.shutdown.retry.backoff.ms = 5000
# if the relicas lags too far behind, the partition relicas will be considered invalid. In general, the synchronization of messages in replicas always lags behind because of network delay and other reasons. If the message is seriously lagging, leader will consider the relicas network to have a large latency or limited message throughput. In environments where the number of broker is small, or the network is insufficient, it is recommended to increase this value.
Replica.lag.max.messages = 4000
# socket timeout for leader and relicas
Replica.socket.timeout.ms= 30 * 1000
# socket cache size of leader replication
Replica.socket.receive.buffer.bytes=64 * 1024
# maximum number of bytes that replicas acquires data at a time
Replica.fetch.max.bytes = 1024 * 1024
# maximum waiting time for communication between replicas and leader. If it fails, it will try again.
Replica.fetch.wait.max.ms = 500,
# the minimum data size for each fetch operation. If the data in leader that has not been synchronized is insufficient, it will wait until the data reaches this size.
Replica.fetch.min.bytes = 1
# the number of threads replicated in leader. Increasing this number increases the IO of relipca.
Num.replica.fetchers = 1
# interval between flush of the highest water level for each replica
Replica.high.watermark.checkpoint.interval.ms = 5000
# whether to automatically balance the allocation policy between broker
Auto.leader.rebalance.enable = false
# the imbalance ratio of leader. If it exceeds this value, the partition will be rebalanced.
Leader.imbalance.per.broker.percentage = 10
# time interval to check whether leader is unbalanced
Leader.imbalance.check.interval.seconds = 300
# maximum amount of space that the client retains offset information
Offset.metadata.max.bytes = 1024
# # Consumer # #
# the configuration of the Consumer core is group.id and zookeeper.connect
# the only group ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group that determines the ownership of the Consumer.
Group.id
# the consumer's ID, if not set, will increase itself
Consumer.id
# An ID for tracking and investigation, preferably the same as group.id
Client.id =
# for the designation of zookeeper clusters, you must use the same zk configuration as broker
Zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182
# zookeeper's heartbeat timeout. Consumers who have checked this time are considered invalid.
Zookeeper.session.timeout.ms = 6000
# waiting time for zookeeper to connect
Zookeeper.connection.timeout.ms = 6000
# synchronization time between follower of zookeeper and leader
Zookeeper.sync.time.ms = 2000
# what to do when there is no initial offset in zookeeper, or when the upper limit of offset is exceeded.
# smallest: reset to minimum
# largest: reset to maximum
# anything else: throw an exception to consumer
Auto.offset.reset = largest
# the timeout of socket. The actual timeout is max.fetch.wait + socket.timeout.ms.
Socket.timeout.ms= 30 * 1000
# receive cache space size of socket
Socket.receive.buffer.bytes=64 * 1024
# message size limit for fetch from each partition
Fetch.message.max.bytes = 1024 * 1024
In # true, Consumer synchronizes offset to zookeeper after consuming messages, so that when Consumer fails, the new consumer can get the latest offset from zookeeper
Auto.commit.enable = true
# time interval for automatic submission
Auto.commit.interval.ms = 60 * 1000
# the maximum number of message blocks used for consumption, each of which can be equal to the value in fetch.message.max.bytes
Queued.max.message.chunks = 10
# when a new consumer is added to the group, reblance will be tried and the consumer side of the partitions will be migrated to the new consumer. This setting is the number of attempts.
Rebalance.max.retries = 4
# interval of each reblance
Rebalance.backoff.ms = 2000
# time of each re-election of leader
Refresh.leader.backoff.ms
The minimum data sent by # server to the consumer end. If this value is not met, it will wait until the specified size is met. The default is 1 to receive immediately.
Fetch.min.bytes = 1
# the maximum waiting time for a consumer request if the fetch.min.bytes is not met
Fetch.wait.max.ms = 100
# if no new messages are available for consumption within a specified period of time, an exception is thrown. The default of-1 means that there is no restriction.
Consumer.timeout.ms =-1
# # Producer##
# Core configuration includes:
# metadata.broker.list
# request.required.acks
# producer.type
# serializer.class
# Consumer gets the address of message meta-information (topics, partitions and replicas). The configuration format is: host1:port1,host2:port2. You can also set a vip outside.
Metadata.broker.list
# confirmation mode of message
# 0: do not guarantee the arrival confirmation of the message, just send it with low latency, but the message will be lost. In the case of a server failure, it is a bit like TCP.
# 1: send a message and wait for leader to receive confirmation for a certain degree of reliability
#-1: send a message, wait for leader to receive confirmation, and copy operation before returning, the highest reliability
Request.required.acks = 0
# maximum waiting time for messages to be sent
Request.timeout.ms = 10000
# cache size of socket
Send.buffer.bytes=100*1024
# serialization method of key. If not set, it is the same as serializer.class.
Key.serializer.class
# Partition policy. The default is to take the model.
Partitioner.class=kafka.producer.DefaultPartitioner
# message compression mode. Default is none. You can have gzip and snappy.
Compression.codec = none
# can be compressed for a specific topic of dictation
Compressed.topics=null
# number of retries after failed message delivery
Message.send.max.retries = 3
# interval after each failure
Retry.backoff.ms = 100
# interval for producers to update topic meta information regularly. If set to 0, the data will be updated after each message is sent.
Topic.metadata.refresh.interval.ms = 600 * 1000
# users specify it at will, but it cannot be repeated. It is mainly used to track and record messages.
Client.id= ""
# maximum time to buffer data in asynchronous mode. For example, if it is set to 100, messages in the 100ms will be collected and sent, which will increase the throughput, but will increase the delay of sending messages.
Queue.buffering.max.ms = 5000
# maximum number of messages buffered in asynchronous mode, as above
Queue.buffering.max.messages = 10000
# waiting time for messages to enter the queue in asynchronous mode. If set to 0, the message will not wait. If it cannot enter the queue, it will be discarded directly.
Queue.enqueue.timeout.ms =-1
# in asynchronous mode, the number of messages sent each time. When queue.buffering.max.messages or queue.buffering.max.ms satisfies the condition, producer will trigger sending.
Batch.num.messages=200
At this point, the study on the "steps to configure properties of kafka" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.