The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX’s research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange’s infrastructure.

Using the Disruptor

Introduction

The best way to understand what the Disruptor is, is to compare it to something well understood and quite similar in purpose. In the case of the Disruptor this would be Java’s BlockingQueue. Like a queue the purpose of the Disruptor is to move data (e.g. messages or events) between threads within the same process. However, there are some key features that the Disruptor provides that distinguish it from a queue. They are:

  • Multicast events to consumers, with consumer dependency graph.

  • Pre-allocate memory for events.

  • Optionally lock-free.

Core Concepts

Before we can understand how the Disruptor works, it is worthwhile defining a number of terms that will be used throughout the documentation and the code. For those with a DDD bent, think of this as the ubiquitous language of the Disruptor domain.

  • Ring Buffer: The Ring Buffer is often considered the main aspect of the Disruptor, however from 3.0 onwards the Ring Buffer is only responsible for the storing and updating of the data (Events) that move through the Disruptor. And for some advanced use cases can be completely replaced by the user.

  • Sequence: The Disruptor uses Sequences as a means to identify where a particular component is up to. Each consumer (EventProcessor) maintains a Sequence as does the Disruptor itself. The majority of the concurrent code relies on the the movement of these Sequence values, hence the Sequence supports many of the current features of an AtomicLong. In fact the only real difference between the 2 is that the Sequence contains additional functionality to prevent false sharing between Sequences and other values.

  • Sequencer: The Sequencer is the real core of the Disruptor. The 2 implementations (single producer, multi producer) of this interface implement all of the concurrent algorithms use for fast, correct passing of data between producers and consumers.

  • Sequence Barrier: The Sequence Barrier is produced by the Sequencer and contains references to the main published Sequence from the Sequencer and the Sequences of any dependent consumer. It contains the logic to determine if there are any events available for the consumer to process.

  • Wait Strategy: The Wait Strategy determines how a consumer will wait for events to be placed into the Disruptor by a producer. More details are available in the section about being optionally lock-free.

  • Event: The unit of data passed from producer to consumer. There is no specific code representation of the Event as it defined entirely by the user.

  • EventProcessor: The main event loop for handling events from the Disruptor and has ownership of consumer’s Sequence. There is a single representation called BatchEventProcessor that contains an efficient implementation of the event loop and will call back onto a used supplied implementation of the EventHandler interface.

  • EventHandler: An interface that is implemented by the user and represents a consumer for the Disruptor.

  • Producer: This is the user code that calls the Disruptor to enqueue Events. This concept also has no representation in the code.

To put these elements into context, below is an example of how LMAX uses the Disruptor within its high performance core services, e.g. the exchange.

models
Figure 1. Disruptor with a set of dependent consumers.

Multicast Events

This is the biggest behavioural difference between queues and the Disruptor. When you have multiple consumers listening on the same Disruptor all events are published to all consumers in contrast to a queue where a single event will only be sent to a single consumer. The behaviour of the Disruptor is intended to be used in cases where you need to independent multiple parallel operations on the same data. The canonical example from LMAX is where we have three operations, journalling (writing the input data to a persistent journal file), replication (sending the input data to another machine to ensure that there is a remote copy of the data), and business logic (the real processing work). The Executor-style event processing, where scale is found by processing different events in parallel at the same is also possible using the WorkerPool. Note that is bolted on top of the existing Disruptor classes and is not treated with the same first class support, hence it may not be the most efficient way to achieve that particular goal.

Looking at Figure 1 is possible to see that there are 3 Event Handlers listening (JournalConsumer, ReplicationConsumer and ApplicationConsumer) to the Disruptor, each of these Event Handlers will receive all of the messages available in the Disruptor (in the same order). This allows for work for each of these consumers to operate in parallel.

Consumer Dependency Graph

To support real world applications of the parallel processing behaviour it was necessary to support co-ordination between the consumers. Referring back to the example described above, it necessary to prevent the business logic consumer from making progress until the journalling and replication consumers have completed their tasks. We call this concept gating, or more correctly the feature that is a super-set of this behaviour is called gating. Gating happens in two places. Firstly we need to ensure that the producers do not overrun consumers. This is handled by adding the relevant consumers to the Disruptor by calling RingBuffer.addGatingConsumers(). Secondly, the case referred to previously is implemented by constructing a SequenceBarrier containing Sequences from the components that must complete their processing first.

Referring to Figure 1 there are 3 consumers listening for Events from the Ring Buffer. There is a dependency graph in this example.

The ApplicationConsumer depends on the JournalConsumer and ReplicationConsumer. This means that the JournalConsumer and ReplicationConsumer can run freely in parallel with each other. The dependency relationship can be seen by the connection from the ApplicationConsumer's SequenceBarrier to the Sequences of the JournalConsumer and ReplicationConsumer. It is also worth noting the relationship that the Sequencer has with the downstream consumers. One of its roles is to ensure that publication does not wrap the Ring Buffer. To do this none of the downstream consumer may have a Sequence that is lower than the Ring Buffer’s Sequence less the size of the Ring Buffer. However using the graph of dependencies an interesting optimisation can be made. Because the ApplicationConsumer's Sequence is guaranteed to be less than or equal to JournalConsumer and ReplicationConsumer (that is what that dependency relationship ensures) the Sequencer need only look at the Sequence of the ApplicationConsumer. In a more general sense the Sequencer only needs to be aware of the Sequences of the consumers that are the leaf nodes in the dependency tree.

Event Preallocation

One of the goals of the Disruptor was to enable use within a low latency environment. Within low-latency systems it is necessary to reduce or remove memory allocations. In Java-based system the purpose is to reduce the number stalls due to garbage collection (in low-latency C/C++ systems, heavy memory allocation is also problematic due to the contention that be placed on the memory allocator).

To support this the user is able to preallocate the storage required for the events within the Disruptor. During construction and EventFactory is supplied by the user and will be called for each entry in the Disruptor’s Ring Buffer. When publishing new data to the Disruptor the API will allow the user to get hold of the constructed object so that they can call methods or update fields on that store object. The Disruptor provides guarantees that these operations will be concurrency-safe as long as they are implemented correctly.

Optionally Lock-free

Another key implementation detail pushed by the desire for low-latency is the extensive use of lock-free algorithms to implement the Disruptor. All of the memory visibility and correctness guarantees are implemented using memory barriers and/or compare-and-swap operations. There is only one use-case where a actual lock is required and that is within the BlockingWaitStrategy. This is done solely for the purpose of using a Condition so that a consuming thread can be parked while waiting for new events to arrive. Many low-latency systems will use a busy-wait to avoid the jitter that can be incurred by using a Condition, however in number of system busy-wait operations can lead to significant degradation in performance, especially where the CPU resources are heavily constrained, e.g. web servers in virtualised-environments.

Getting Started

Getting the Disruptor

The Disruptor jar file is available from Maven Central and can be integrated into your dependency manager of choice from there.

Basic Produce and Consume

To get started with the Disruptor we are going to consider very simple and contrived example, one that will pass a single long value from a producer to a consumer, where the consumer will simply print out the value. Firstly we will define the Event that will carry the data.

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}

In order to allow the Disruptor to preallocate these events for us, we need to an EventFactory that will perform the construction.

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

Once we have the event defined we need to create a consumer that will handle these events. In our case all we want to do is print the value out the the console.

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

We will need a source for these events, for the sake of an example I am going to assume that the data is coming from some sort of I/O device, e.g. network or file in the form of a ByteBuffer.

Publishing Using Translators

With version 3.0 of the Disruptor a richer Lambda-style API was added to help developers by encapsulating this complexity within the Ring Buffer, so post-3.0 the preferred approach for publishing messages is via the Event Publisher/Event Translator portion of the API. E.g.

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;

public class LongEventProducerWithTranslator
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

The other advantage of this approach is that the translator code can be pulled into a separate class and easily unit tested independently. The Disruptor provides a number of different interfaces (EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.) that can be implemented to provide translators. The reason for is to allow for the translators to be represented as static classes or non-capturing lambda (when Java 8 rolls around) as the arguments to the translation method are passed through the call on the Ring Buffer through to the translator.

Publishing Using the Legacy API

There is a more "raw" approach that we can use.

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

What becomes immediately obvious is that event publication becomes more involved than using a simple queue. This is due to the desire for event preallocation. It requires (at the lowest level) a 2-phase approach to message publication, i.e. claim the slot in the ring buffer then publish the available data. It is also necessary to wrap publication in a try/finally block. If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence. Failing to do so can result in corruption of the state of the Disruptor. Specifically, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart. Therefore it is recommended that the EventTranslator APIs be used.

The final step is to wire the whole thing together. It is possible to wire all of the components manually, however it can be a little bit complicated so a DSL is provided to simplify construction. Some of the more complicated options are not available via the DSL, however it is suitable for most circumstances.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}
Using Java 8

One of the design influences of the Disruptor’s API was that Java 8 was going to rely on the concept of Functional Interfaces to serve as type declarations for Java Lambdas. Most of the interface definitions in the Disruptor API fit the requirements of a Functional Interface so that a Lambda can be used instead of a custom class, which can reduce the boiler place required.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

Note how a number of the classes (e.g. handler, translator) are no longer required. Also note how the lambda used for publishEvent() only refers to the parameters that are passed in. If we were to instead write that code as:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

This would create a capturing lambda, meaning that it would need to instantiate an object to hold the ByteBuffer bb variable as it passes the lambda through to the publishEvent() call. This will create additional (unnecessary) garbage, so the call that passes the argument through to the lambda should be preferred if low GC pressure is a requirement.

Give that method references can be used instead of anonymous lamdbas it is possible to rewrite the example in this fashion.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
        event.set(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

Basic Tuning Options

Using the above approach will work functionally in the widest set of deployment scenarios. However, if you able to make certain assumptions about the hardware and software environment that the Disruptor will run in then you can take advantage of a number of tuning options to improve performance. There are 2 main options for tuning, single vs. multiple producers and alternative wait strategies.

Single vs. Multiple Producers

One of the best ways to improve performance in concurrent systems is to adhere to the Single Writer Principle, this applies to the Disruptor. If you are in the situation where there will only ever be a single thread producing events into the Disruptor, then you can take advantage of this to gain additional performance.

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(
            factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
        //.....
    }
}

To give an indication of how much of a performance advantage can be achieved through this technique we can change the producer type in the OneToOne performance test. Tests run on i7 Sandy Bridge MacBook Air.

Table 1. Multiple Producer

Run 0

Disruptor=26,553,372 ops/sec

Run 1

Disruptor=28,727,377 ops/sec

Run 2

Disruptor=29,806,259 ops/sec

Run 3

Disruptor=29,717,682 ops/sec

Run 4

Disruptor=28,818,443 ops/sec

Run 5

Disruptor=29,103,608 ops/sec

Run 6

Disruptor=29,239,766 ops/sec

Table 2. Single Producer

Run 0

Disruptor=89,365,504 ops/sec

Run 1

Disruptor=77,579,519 ops/sec

Run 2

Disruptor=78,678,206 ops/sec

Run 3

Disruptor=80,840,743 ops/sec

Run 4

Disruptor=81,037,277 ops/sec

Run 5

Disruptor=81,168,831 ops/sec

Run 6

Disruptor=81,699,346 ops/sec

Alternative Wait Strategies

The default wait strategy used by the Disruptor is the BlockingWaitStrategy. Internally the BlockingWaitStrategy uses a typical lock and condition variable to handle thread wake-up. The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage and will give the most consistent behaviour across the widest variety of deployment options. However, again knowledge of the deployed system can allow for additional performance.

  • SleepingWaitStrategy

Like the BlockingWaitStrategy the SleepingWaitStrategy it attempts to be conservative with CPU usage, by using a simple busy wait loop, but uses a call to LockSupport.parkNanos(1) in the middle of the loop. On a typical Linux system this will pause the thread for around 60┬Ás. However it has the benefit that the producing thread does not need to take any action other increment the appropriate counter and does not require the cost of signalling a condition variable. However, the mean latency of moving the event between the producer and consumer threads will be higher. It works best in situations where low latency is not required, but a low impact on the producing thread is desired. A common use case is for asynchronous logging.

  • YieldingWaitStrategy

The YieldingWaitStrategy is one of 2 Wait Strategies that can be use in low latency systems, where there is the option to burn CPU cycles with the goal of improving latency. The YieldingWaitStrategy will busy spin waiting for the sequence to increment to the appropriate value. Inside the body of the loop Thread.yield() will be called allowing other queued threads to run. This is the recommended wait strategy when need very high performance and the number of Event Handler threads is less than the total number of logical cores, e.g. you have hyper-threading enabled.

  • BusySpinWaitStrategy

The BusySpinWaitStrategy is the highest performing Wait Strategy, but puts the highest constraints on the deployment environment. This wait strategy should only be used if the number of Event Handler threads is smaller than the number of physical cores on the box. E.g. hyper-threading should be disabled.

Clearing Objects From the Ring Buffer

When passing data via the Disruptor, it is possible for objects to live longer than intended. To avoid this happening it may be necessary to clear out the event after processing it. If you have a single event handler clearing out the value within the same handler is sufficient. If you have a chain of event handlers, then you may need a specific handler placed at the end of the chain to handle clearing out the object.

class ObjectEvent<T>
{
    T val;

    void clear()
    {
        val = null;
    }
}

public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
    public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear();
    }
}

public static void main(String[] args)
{
    Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
        () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);

    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
}

Advanced Techniques

Dealing With Large Batches

Early Release Example
package com.lmax.disruptor.example;

import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.support.LongEvent;

public class EarlyReleaseHandler implements SequenceReportingEventHandler<LongEvent>
{
    private Sequence sequenceCallback;
    private int batchRemaining = 20;

    @Override
    public void setSequenceCallback(Sequence sequenceCallback)
    {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        processEvent(event);

        boolean logicalChunkOfWorkComplete = isLogicalChunkOfWorkComplete();
        if (logicalChunkOfWorkComplete)
        {
            sequenceCallback.set(sequence);
        }

        batchRemaining = logicalChunkOfWorkComplete || endOfBatch ? 20 : batchRemaining;
    }

    private boolean isLogicalChunkOfWorkComplete()
    {
        // Ret true or false based on whatever cirteria is required for the smaller
        // chunk.  If this is doing I/O, it may be after flushing/syncing to disk
        // or at the end of DB batch+commit.
        // Or it could simply be working off a smaller batch size.

        return --batchRemaining == -1;
    }

    private void processEvent(LongEvent event)
    {
        // Do processing
    }
}

Developing the Disruptor

A guide to checking out the code, building it and running the performance tests

Getting Started

We have switched from ant to gradle. More detailed instructions on how to work on the disruptor using gradle will follow. To get started list the available tasks by issuing:

$ ./gradlew tasks

and take it from there.

No gradle install is necessary as gradle is self-provisioning.

Basic workflow

  1. Check out the project locally to your machine

    $ cd ${MY_PROJECTS_HOME}
    $ git clone git://github.com/LMAX-Exchange/disruptor.git
  2. Build a distribution

    $ cd ${MY_PROJECTS_HOME}/disruptor
    $ ./gradlew

    As a result of the build you should find the following files:

    • ${MY_PROJECTS_HOME}/disruptor/build/libs/disruptor-{VERSION_NUMBER}-SNAPSHOT.jar

    • ${MY_PROJECTS_HOME}/disruptor/build/libs/disruptor-{VERSION_NUMBER}-SNAPSHOT-javadoc.jar

    • ${MY_PROJECTS_HOME}/disruptor/build/libs/disruptor-{VERSION_NUMBER}-SNAPSHOT-sources.jar

  3. Run the performance tests

    $ cd ${MY_PROJECTS_HOME}/disruptor
    $ ./gradlew perfJar
    $ java -cp build/libs/disruptor-perf-3.3.0.jar [test name, e.g. com.lmax.disruptor.sequenced.OneToOneSequencedThroughputTest]

    There are also JMH Benchmarks for testing the performance of the Disruptor, these can be run with a gradle command

    $ ./gradlew jmh
  4. Development tips

    This project uses GitHub Actions for CI which runs the gradle build task to check the project can be built on each commit/pull request.

    So that you don’t push changes that will fail the simple test & check steps we would suggest adding the git pre-commit hook which will block any commit you try to do locally if the code fails these steps.

    The git hook can be set up using the following gradle task:

    $ ./gradlew setUpGitHooks

Design and Implementation

  • Single Producer Algorithm

  • Multiple Producer Algorithm

Known Issues

  • On 32 bit Linux systems it appears that LockSupport.parkNanos() is quite expensive, therefore using the SleepingWaitStrategy is not recommended.