In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces how to integrate Disruptor, Kafka and Netty, which has a certain reference value. Interested friends can refer to it. I hope you can learn a lot after reading this article.
NETTY Application Gateway
The core of the whole gateway is a netty server, and various applications (including web server, mobile phone app, etc.) are connected to this netty server to request data; as for the data source, you need to monitor multiple kafka topic (and the topic here is variable, that is, you need to dynamically start and stop the kafka consumer), and then you need to integrate all these topic data together and send them to the client application through channel.
Data flow graph
Source code
Below, post most of the code, which can be referenced by the students in need. Will explain the key technical points, partial to the business part of everyone to ignore it.
Main function
Start disruptor; to listen to a fixed topic, give the acquired msg to ConsumerProcessorGroup to complete the creation and stop of kafka consumer.
Public static void main (String [] args) {
DisruptorHelper.getInstance () .start ()
Properties props = ConsumerProps.getConsumerProps ()
KafkaConsumer consumer = new KafkaConsumer (props)
Consumer.subscribe (Arrays.asList ("uavlst"))
While (true) {
ConsumerRecords records = consumer.poll
ConsumerRecord lastRecord = null
For (ConsumerRecord record: records)
LastRecord = record
If (lastRecord! = null) {
ConsumerProcessorGroup.getInstance () .recieveNewUavLst (lastRecord.value ()
}
}
}
DisruptorHelper
DisruptorHelper is a singleton, which mainly contains a disruptor object. ProducerType.MULTI and new BlockingWaitStrategy () are used when new this object, where the former means that we need multiple producer to work together, while the latter is actually the default producer waiting policy, which will be adjusted later according to the actual situation.
Public class DisruptorHelper {
Private static DisruptorHelper instance = null
Public static DisruptorHelper getInstance () {
If (instance = = null) {
Instance = new DisruptorHelper ()
}
Return instance
}
Private final int BUFFER_SIZE = 1024
Private Disruptor disruptor = null
Private DisruptorHelper () {
MsgEventHandler eventHandler = new MsgEventHandler ()
Disruptor = new Disruptor (new MsgEventFactory (), BUFFER_SIZE, new ConsumerThreadFactory (), ProducerType.MULTI, new BlockingWaitStrategy ())
Disruptor.handleEventsWith (eventHandler)
}
Public void start () {
Disruptor.start ()
}
Public void shutdown () {
Disruptor.shutdown ()
}
Public void produce (ConsumerRecord record) {
RingBuffer ringBuffer = disruptor.getRingBuffer ()
Long sequence = ringBuffer.next ()
Try {
RingBuffer.get (sequence) .setRecord (record)
} finally {
RingBuffer.publish (sequence)
}
}
}
ConsumerProcessorGroup
ConsumerProcessorGroup is a singleton that contains a fixedThreadPool that starts threads dynamically to consume kafka topic.
Public class ConsumerProcessorGroup {
Private static ConsumerProcessorGroup instance = null
Public static ConsumerProcessorGroup getInstance () {
If (instance = = null) {
Instance = new ConsumerProcessorGroup ()
}
Return instance
}
Private ConsumerProcessorGroup () {
}
Private ExecutorService fixedThreadPool = Executors.newFixedThreadPool (20)
Public List uavIDLst = new Vector ()
Public void recieveNewUavLst (String uavIDs) {
List newUavIDs = Arrays.asList (uavIDs.split (",")
For (String uavID: newUavIDs) {
If (! uavIDLst.contains (uavID)) {
FixedThreadPool.execute (new ConsumerThread (uavID))
UavIDLst.add (uavID)
}
}
List tmpLstForDel = new ArrayList ()
For (String uavID: uavIDLst) {
If (! newUavIDs.contains (uavID)) {
TmpLstForDel.add (uavID)
}
}
UavIDLst.removeAll (tmpLstForDel)
}
}
ConsumerThread
Kafka topic is consumed, and the acquired record is written to the ring buffer of disruptor through DisruptorHelper.
Public class ConsumerThread implements Runnable {
Private String uavID
Public ConsumerThread (String uavID) {
This.uavID = uavID
}
Public void run () {
Properties props = ConsumerProps.getConsumerProps ()
KafkaConsumer consumer = new KafkaConsumer (props)
Consumer.subscribe (Arrays.asList (uavID))
System.out.println (uavID + consumer started! Current thread id is + Thread.currentThread () .getId ())
While (ConsumerProcessorGroup.getInstance () .uavIDLst.contains (uavID)) {
ConsumerRecords records = consumer.poll
For (ConsumerRecord record: records) {
DisruptorHelper.getInstance () produce (record)
}
}
System.out.println (uavID + consumer finished! Current thread id is + Thread.currentThread () .getId ())
}
}
MsgEventHandler
The consumer of the Disruptor, in turn, reads the data from the Ring Buffer and performs the corresponding processing.
Public class MsgEventHandler implements EventHandler {
Private Map converterMap
Public void onEvent (MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
ConsumerRecord record = event.getRecord ()
System.out.printf ("topic =% s, part =% d, offset =% d, key =% s, value =% s\ n\ r", record.topic (), record.partition (), record.offset (), record.key (), record.value ())
}
}
Thank you for reading this article carefully. I hope the article "how to integrate Disruptor, Kafka and Netty" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.