In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to implement the Storm-kafka interface". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to implement the Storm-kafka interface.
Reading background: if necessary, please refer to another document in this space.
Purpose: to understand how Storm encapsulates the kafka interface and how to deal with the encapsulation of Connection connections
Package com.mixbox.storm.kafka;import kafka.javaapi.consumer.SimpleConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.IBrokerReader;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set / * * 2014-07-22 * dynamic [partition connection] * @ author Yin Shuai * / public class DynamicPartitionConnections {public static final Logger LOG = LoggerFactory .getLogger (DynamicPartitionConnections.class) / * holds an underlying SimpleConsumer object of kafka * holds a specific partition * * @ author Yin Shuai * / static class ConnectionInfo {/ / maintains a SimpleConsumer SimpleConsumer consumer internally / / Set partitions = new HashSet (); public ConnectionInfo (SimpleConsumer consumer) {this.consumer = consumer }} / * that is, each node of kafka maintains a COnnectionInfo,ConnectionInfo * / Map _ connections = new HashMap (); / / kafkaConfig KafkaConfig _ config; / * IBrokerReader basically IbroerReader here initializes ZkBrokerReader * / IBrokerReader _ reader / * * @ param config * kafka configuration * @ param brokerReader * IBrokerReader- is used to get the current interface * / public DynamicPartitionConnections (KafkaConfig config, IBrokerReader brokerReader) {_ config = config; _ reader = brokerReader } / * * @ param partition partition * @ return * / public SimpleConsumer register (Partition partition) {/ * according to the partition number you have, get your corresponding Broker * GlobalPartitionInformation with Map * partitionMap The relationship between partition number and Broker is recorded * / Broker broker = _ reader.getCurrentBrokers () .getBrokerFor (partition.partition) Return register (broker, partition.partition) } / * * @ param host * host * @ param partition * partition * @ return underlying SimpleConsumer object, where there is a registered behavior that sets the host and port [broker] And partition [partition] register in the connections connection * / public SimpleConsumer register (Broker host, int partition) {/ / Map _ connections = new HashMap () / / if Broker is not included in the connection, a new connection is established And register the host and connection in _ connections if (! _ connections.containsKey (host)) {_ connections.put (host, new ConnectionInfo (new SimpleConsumer (host.host, host.port, _ config.socketTimeoutMs, _ config.bufferSizeBytes) _ config.clientId) } / /-here, take it only once, regardless of whether it was done before or not-/ / when it is included, then take out ConnectionInfo info = _ connections.get (host) directly. Info.partitions.add (partition); return info.consumer;} public SimpleConsumer getConnection (Partition partition) {/ / ConnectionInfo encapsulates a simpleConsumer ConnectionInfo info = _ connections.get (partition.host); if (info! = null) {return info.consumer } return null;} / * * @ param port fixed Broker * @ param partition fixed partition * / public void unregister (Broker port, int partition) {ConnectionInfo info = _ connections.get (port); info.partitions.remove (partition) If (info.partitions.isEmpty ()) {info.consumer.close (); _ connections.remove (port);}} public void unregister (Partition partition) {unregister (partition.host, partition.partition) } public void clear () {for (ConnectionInfo info: _ connections.values ()) {info.consumer.close ();} _ connections.clear ();}}
Related to the previous article
1: in DynamicPartitionConnections, we hold an interface object for IBrokerReader.
2: because IBrokerReader is derived from
2.1 StaticBrokerReader
2.2 ZBrokerReader
In a series of blog posts in this sequence, ZBrokerReader has done a detailed analysis, and in the process of assignment, IBrokerReader is also instantiated as ZBrokerReader.
Inner class:
DynamicPartitionConnections holds an inner class of CinnectionInfo
Static class ConnectionInfo {/ / maintains a SimpleConsumer SimpleConsumer consumer; / / partition Set partitions = new HashSet (); public ConnectionInfo (SimpleConsumer consumer) {this.consumer = consumer;}}
1: a SimpleConsumer is maintained internally for each Connection, as well as a Set collection partitions
2: we maintain a _ connections object in DynamicPartitionConnections
Map _ connections = new HashMap ()
3: the key point in connection maintenance is the behavior of maintaining an register registry:
Public SimpleConsumer register (Broker host, int partition) {
4: if Broker is not included in _ connections, a new connection will be established and Broker and Connection will be registered with _ connections
5: in the process of registration, register without inclusion, and finally directly take out the SimpleConsumer, this SimpleConsumer
Encapsulated
New ConnectionInfo (new SimpleConsumer (
Host.host, host.port, _ config.socketTimeoutMs
_ config.bufferSizeBytes, _ config.clientId)):
At this point, I believe you have a deeper understanding of "how to implement the Storm-kafka interface". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.