In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
Disruptor 例子import java.util.concurrent.ThreadFactoryimport com.lmax.disruptor.dsl.{Disruptor, ProducerType}import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}object DisruptorTest { val disruptor = { val factory = new EventFactory[Event] { override def newInstance(): Event = Event(-1) } val threadFactory = new ThreadFactory(){ override def newThread(r: Runnable): Thread = new Thread(r) } val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()) disruptor.handleEventsWith(TestHandler).`then`(ThenHandler) disruptor } val translator = new EventTranslatorOneArg[Event, Int]() { override def translateTo(event: Event, sequence: Long, arg: Int): Unit = { event.id = arg println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}") } } def main(args: Array[String]): Unit = { disruptor.start() (0 until 100).foreach { i => disruptor.publishEvent(translator, i) } disruptor.shutdown() }}case class Event(var id: Int) { override def toString: String = s"event: ${id}"}object TestHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") }}object ThenHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") }}源码阅读disrutpor 初始化
先看 Disruptor 构造方法
public Disruptor(final EventFactory eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory));}
在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
public static RingBuffer create(ProducerType producerType, EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); }}public static RingBuffer createSingleProducer(EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer(factory, sequencer);}RingBufferFields(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory);}private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); }}消费事件消息
首先看 disruptor.start(): 消费事件消息入口
private final ConsumerRepository consumerRepository = new ConsumerRepository();public RingBuffer start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer;}
consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
public final EventHandlerGroup handleEventsWith(final EventHandler
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.