Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to integrate Disruptor, Kafka and Netty

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces how to integrate Disruptor, Kafka and Netty, which has a certain reference value. Interested friends can refer to it. I hope you can learn a lot after reading this article.

NETTY Application Gateway

The core of the whole gateway is a netty server, and various applications (including web server, mobile phone app, etc.) are connected to this netty server to request data; as for the data source, you need to monitor multiple kafka topic (and the topic here is variable, that is, you need to dynamically start and stop the kafka consumer), and then you need to integrate all these topic data together and send them to the client application through channel.

Data flow graph

Source code

Below, post most of the code, which can be referenced by the students in need. Will explain the key technical points, partial to the business part of everyone to ignore it.

Main function

Start disruptor; to listen to a fixed topic, give the acquired msg to ConsumerProcessorGroup to complete the creation and stop of kafka consumer.

Public static void main (String [] args) {

DisruptorHelper.getInstance () .start ()

Properties props = ConsumerProps.getConsumerProps ()

KafkaConsumer consumer = new KafkaConsumer (props)

Consumer.subscribe (Arrays.asList ("uavlst"))

While (true) {

ConsumerRecords records = consumer.poll

ConsumerRecord lastRecord = null

For (ConsumerRecord record: records)

LastRecord = record

If (lastRecord! = null) {

ConsumerProcessorGroup.getInstance () .recieveNewUavLst (lastRecord.value ()

}

}

}

DisruptorHelper

DisruptorHelper is a singleton, which mainly contains a disruptor object. ProducerType.MULTI and new BlockingWaitStrategy () are used when new this object, where the former means that we need multiple producer to work together, while the latter is actually the default producer waiting policy, which will be adjusted later according to the actual situation.

Public class DisruptorHelper {

Private static DisruptorHelper instance = null

Public static DisruptorHelper getInstance () {

If (instance = = null) {

Instance = new DisruptorHelper ()

}

Return instance

}

Private final int BUFFER_SIZE = 1024

Private Disruptor disruptor = null

Private DisruptorHelper () {

MsgEventHandler eventHandler = new MsgEventHandler ()

Disruptor = new Disruptor (new MsgEventFactory (), BUFFER_SIZE, new ConsumerThreadFactory (), ProducerType.MULTI, new BlockingWaitStrategy ())

Disruptor.handleEventsWith (eventHandler)

}

Public void start () {

Disruptor.start ()

}

Public void shutdown () {

Disruptor.shutdown ()

}

Public void produce (ConsumerRecord record) {

RingBuffer ringBuffer = disruptor.getRingBuffer ()

Long sequence = ringBuffer.next ()

Try {

RingBuffer.get (sequence) .setRecord (record)

} finally {

RingBuffer.publish (sequence)

}

}

}

ConsumerProcessorGroup

ConsumerProcessorGroup is a singleton that contains a fixedThreadPool that starts threads dynamically to consume kafka topic.

Public class ConsumerProcessorGroup {

Private static ConsumerProcessorGroup instance = null

Public static ConsumerProcessorGroup getInstance () {

If (instance = = null) {

Instance = new ConsumerProcessorGroup ()

}

Return instance

}

Private ConsumerProcessorGroup () {

}

Private ExecutorService fixedThreadPool = Executors.newFixedThreadPool (20)

Public List uavIDLst = new Vector ()

Public void recieveNewUavLst (String uavIDs) {

List newUavIDs = Arrays.asList (uavIDs.split (",")

For (String uavID: newUavIDs) {

If (! uavIDLst.contains (uavID)) {

FixedThreadPool.execute (new ConsumerThread (uavID))

UavIDLst.add (uavID)

}

}

List tmpLstForDel = new ArrayList ()

For (String uavID: uavIDLst) {

If (! newUavIDs.contains (uavID)) {

TmpLstForDel.add (uavID)

}

}

UavIDLst.removeAll (tmpLstForDel)

}

}

ConsumerThread

Kafka topic is consumed, and the acquired record is written to the ring buffer of disruptor through DisruptorHelper.

Public class ConsumerThread implements Runnable {

Private String uavID

Public ConsumerThread (String uavID) {

This.uavID = uavID

}

Public void run () {

Properties props = ConsumerProps.getConsumerProps ()

KafkaConsumer consumer = new KafkaConsumer (props)

Consumer.subscribe (Arrays.asList (uavID))

System.out.println (uavID + consumer started! Current thread id is + Thread.currentThread () .getId ())

While (ConsumerProcessorGroup.getInstance () .uavIDLst.contains (uavID)) {

ConsumerRecords records = consumer.poll

For (ConsumerRecord record: records) {

DisruptorHelper.getInstance () produce (record)

}

}

System.out.println (uavID + consumer finished! Current thread id is + Thread.currentThread () .getId ())

}

}

MsgEventHandler

The consumer of the Disruptor, in turn, reads the data from the Ring Buffer and performs the corresponding processing.

Public class MsgEventHandler implements EventHandler {

Private Map converterMap

Public void onEvent (MsgEvent event, long sequence, boolean endOfBatch) throws Exception {

ConsumerRecord record = event.getRecord ()

System.out.printf ("topic =% s, part =% d, offset =% d, key =% s, value =% s\ n\ r", record.topic (), record.partition (), record.offset (), record.key (), record.value ())

}

}

Thank you for reading this article carefully. I hope the article "how to integrate Disruptor, Kafka and Netty" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report