Module com.lmax.disruptor
Package com.lmax.disruptor
The Disruptor is a concurrent programming framework for exchanging and coordinating work as a continuous series of events.
It can be used as an alternative to wiring processing stages together via queues.
The Disruptor design has the characteristics of generating significantly less garbage than queues and separates the
concurrency concerns so non-locking algorithms can be employed resulting in greater scalability and performance.
It works on the principle of having a number of stages that are each single threaded with local state and memory. No global memory exists and all communication is achieved by passing messages/state via managed ring buffers.
Almost any graph or pipeline structure can be composed via one or more Disruptor patterns.
UniCast a series of items between 1 publisher and 1 EventProcessor.
track to prevent wrap
+------------------+
| |
| v
+----+ +-----+ +----+ +====+ +====+ +-----+
| P1 |--->| EP1 | | P1 |--->| RB |<---| SB | | EP1 |
+----+ +-----+ +----+ +====+ +====+ +-----+
claim get ^ |
| |
+--------+
waitFor
Sequence a series of messages from multiple publishers
track to prevent wrap
+--------------------+
| |
| v
+----+ +----+ +====+ +====+ +-----+
| P1 |-------+ | P1 |--->| RB |<---| SB | | EP1 |
+----+ | +----+ +====+ +====+ +-----+
v ^ get ^ |
+----+ +-----+ +----+ | | |
| P2 |--->| EP1 | | P2 |------+ +---------+
+----+ +-----+ +----+ | waitFor
^ |
+----+ | +----+ |
| P3 |-------+ | P3 |------+
+----+ +----+
Pipeline a series of messages
+----+ +-----+ +-----+ +-----+
| P1 |--->| EP1 |--->| EP2 |--->| EP3 |
+----+ +-----+ +-----+ +-----+
track to prevent wrap
+----------------------------------------------------------------+
| |
| v
+----+ +====+ +=====+ +-----+ +=====+ +-----+ +=====+ +-----+
| P1 |--->| RB | | SB1 |<---| EP1 |<---| SB2 |<---| EP2 |<---| SB3 |<---| EP3 |
+----+ +====+ +=====+ +-----+ +=====+ +-----+ +=====+ +-----+
claim ^ get | waitFor | waitFor | waitFor
| | | |
+---------+---------------------+---------------------+
Multicast a series of messages to multiple EventProcessors
+-----+ track to prevent wrap
+----->| EP1 | +--------------------+----------+----------+
| +-----+ | | | |
| | v v v
+----+ +-----+ +----+ +====+ +====+ +-----+ +-----+ +-----+
| P1 |--->| EP2 | | P1 |--->| RB |<---| SB | | EP1 | | EP2 | | EP3 |
+----+ +-----+ +----+ +====+ +====+ +-----+ +-----+ +-----+
| claim get ^ | | |
| +-----+ | | | |
+----->| EP3 | +---------+----------+----------+
+-----+ waitFor
Replicate a message then fold back the results
+-----+ track to prevent wrap
+----->| EP1 |-----+ +-------------------------------+
| +-----+ | | |
| v | v
+----+ +-----+ +----+ +====+ +=====+ +-----+
| P1 | | EP3 | | P1 |--->| RB |<--------------| SB2 |<---| EP3 |
+----+ +-----+ +----+ +====+ +=====+ +-----+
| ^ claim ^ get | waitFor
| +-----+ | | |
+----->| EP2 |-----+ +=====+ +-----+ |
+-----+ | SB1 |<---| EP1 |<-----+
+=====+ +-----+ |
^ |
| +-----+ |
+-------| EP2 |<-----+
waitFor +-----+
Code Example
// Event holder for data to be exchanged
public final class ValueEvent
{
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}
// Callback handler which can be implemented by EventProcessors
final EventHandler<ValueEvent> eventHandler = new EventHandler<ValueEvent>()
{
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch)
throws Exception
{
// process a new event as it becomes available.
}
};
RingBuffer<ValueEvent> ringBuffer =
new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
SequenceBarrier<ValueEvent> sequenceBarrier = ringBuffer.newBarrier();
BatchEventProcessor<ValueEvent> batchProcessor = new BatchEventProcessor<ValueEvent>(sequenceBarrier, eventHandler);
ringBuffer.setGatingSequences(batchProcessor.getSequence());
// Each processor runs on a separate thread
EXECUTOR.submit(batchProcessor);
// Publishers claim events in sequence
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(1234);
// publish the event so it is available to EventProcessors
ringBuffer.publish(sequence);
-
Interface Summary Interface Description BatchRewindStrategy Strategy for handling a rewindableException when processing an event.Cursored Implementors of this interface must provide a single long value that represents their current cursor value.DataProvider<T> Typically used to decouple classes fromRingBuffer
to allow easier testingEventFactory<T> Called by theRingBuffer
to pre-populate all the events to fill the RingBuffer.EventHandler<T> Callback interface to be implemented for processing events as they become available in theRingBuffer
EventHandlerIdentity EventPoller.Handler<T> A callback used to process eventsEventProcessor An EventProcessor needs to be an implementation of a runnable that will poll for events from theRingBuffer
using the appropriate wait strategy.EventSequencer<T> Pulls together the low-level data access and sequencing operations ofRingBuffer
EventSink<E> Write interface forRingBuffer
.EventTranslator<T> Implementations translate (write) data representations into events claimed from theRingBuffer
.EventTranslatorOneArg<T,A> Implementations translate another data representations into events claimed from theRingBuffer
EventTranslatorThreeArg<T,A,B,C> Implementations translate another data representations into events claimed from theRingBuffer
EventTranslatorTwoArg<T,A,B> Implementations translate another data representations into events claimed from theRingBuffer
EventTranslatorVararg<T> Implementations translate another data representations into events claimed from theRingBuffer
ExceptionHandler<T> Callback handler for uncaught exceptions in the event processing cycle of theBatchEventProcessor
RewindableEventHandler<T> Callback interface to be implemented for processing events as they become available in theRingBuffer
with support for throwing aRewindableException
when an even cannot be processed currently but may succeed on retry.RewindHandler SequenceBarrier Coordination barrier for tracking the cursor for publishers and sequence of dependentEventProcessor
s for processing a data structureSequenced Operations related to the sequencing of items in aRingBuffer
.Sequencer Coordinates claiming sequences for access to a data structure while tracking dependentSequence
sWaitStrategy Strategy employed for makingEventProcessor
s wait on a cursorSequence
. -
Class Summary Class Description AbstractSequencer Base class for the various sequencer types (single/multi).AggregateEventHandler<T> An aggregate collection ofEventHandler
s that get called in sequence for each event.BatchEventProcessor<T> Convenience class for handling the batching semantics of consuming entries from aRingBuffer
and delegating the available events to anEventHandler
.BatchEventProcessorBuilder BlockingWaitStrategy Blocking strategy that uses a lock and condition variable forEventProcessor
s waiting on a barrier.BusySpinWaitStrategy Busy Spin strategy that uses a busy spin loop forEventProcessor
s waiting on a barrier.EventPoller<T> Experimental poll-based interface for the Disruptor.EventuallyGiveUpBatchRewindStrategy Strategy for handling a rewindableException that will eventually delegate the exception to theExceptionHandler
after a specified number of attempts have been made.ExceptionHandlers Provides static methods for accessing a defaultExceptionHandler
object.FatalExceptionHandler Convenience implementation of an exception handler that uses the standard JDK logging ofSystem.Logger
to log the exception asSystem.Logger.Level
.ERROR and re-throw it wrapped in aRuntimeException
FixedSequenceGroup Hides a group of Sequences behind a single SequenceIgnoreExceptionHandler Convenience implementation of an exception handler that uses the standard JDK logging ofSystem.Logger
to log the exception asSystem.Logger.Level
.INFOLiteBlockingWaitStrategy Variation of theBlockingWaitStrategy
that attempts to elide conditional wake-ups when the lock is uncontended.LiteTimeoutBlockingWaitStrategy Variation of theTimeoutBlockingWaitStrategy
that attempts to elide conditional wake-ups when the lock is uncontended.MultiProducerSequencer Coordinator for claiming sequences for access to a data structure while tracking dependentSequence
s.NanosecondPauseBatchRewindStrategy Strategy for handling a rewindableException that will pause for a specified amount of nanos.NoOpEventProcessor No operation version of aEventProcessor
that simply tracks aSequence
.PhasedBackoffWaitStrategy Phased wait strategy for waitingEventProcessor
s on a barrier.RewindableException A special exception that can be thrown while using theBatchEventProcessor
.RingBuffer<E> Ring based store of reusable entries containing the data representing an event being exchanged between event producer andEventProcessor
s.Sequence Concurrent sequence class used for tracking the progress of the ring buffer and event processors.SequenceGroup SimpleBatchRewindStrategy Batch rewind strategy that always rewindsSingleProducerSequencer Coordinator for claiming sequences for access to a data structure while tracking dependentSequence
s.SleepingWaitStrategy Sleeping strategy that initially spins, then uses a Thread.yield(), and eventually sleep (LockSupport.parkNanos(n)
) for the minimum number of nanos the OS and JVM will allow while theEventProcessor
s are waiting on a barrier.TimeoutBlockingWaitStrategy Blocking strategy that uses a lock and condition variable forEventProcessor
s waiting on a barrier.YieldingWaitStrategy Yielding strategy that uses a Thread.yield() forEventProcessor
s waiting on a barrier after an initially spinning. -
Enum Summary Enum Description EventPoller.PollState Indicates the result of a call toEventPoller.poll(Handler)
RewindAction The result returned from theBatchRewindStrategy
that decides whether to rewind or throw the exception -
Exception Summary Exception Description AlertException Used to alertEventProcessor
s waiting at aSequenceBarrier
of status changes.InsufficientCapacityException Exception thrown when it is not possible to insert a value into the ring buffer without it wrapping the consuming sequences.TimeoutException Wait strategies may throw this Exception to inform callers that a message has not been detected within a specific time window.