Class Disruptor<T>

  • Type Parameters:
    T - the type of event used.

    public class Disruptor<T>
    extends java.lang.Object
    A DSL-style API for setting up the disruptor pattern around a ring buffer (aka the Builder pattern).

    A simple example of setting up the disruptor with two event handlers that must process events in order:

     Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
     EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() { ... };
     EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() { ... };
     disruptor.handleEventsWith(handler1);
     disruptor.after(handler1).handleEventsWith(handler2);
    
     RingBuffer ringBuffer = disruptor.start();
     
    • Constructor Detail

      • Disruptor

        public Disruptor​(EventFactory<T> eventFactory,
                         int ringBufferSize,
                         java.util.concurrent.ThreadFactory threadFactory)
        Create a new Disruptor. Will default to BlockingWaitStrategy and ProducerType.MULTI
        Parameters:
        eventFactory - the factory to create events in the ring buffer.
        ringBufferSize - the size of the ring buffer.
        threadFactory - a ThreadFactory to create threads to for processors.
      • Disruptor

        public Disruptor​(EventFactory<T> eventFactory,
                         int ringBufferSize,
                         java.util.concurrent.ThreadFactory threadFactory,
                         ProducerType producerType,
                         WaitStrategy waitStrategy)
        Create a new Disruptor.
        Parameters:
        eventFactory - the factory to create events in the ring buffer.
        ringBufferSize - the size of the ring buffer, must be power of 2.
        threadFactory - a ThreadFactory to create threads for processors.
        producerType - the claim strategy to use for the ring buffer.
        waitStrategy - the wait strategy to use for the ring buffer.
    • Method Detail

      • handleEventsWith

        @SafeVarargs
        public final EventHandlerGroup<T> handleEventsWith​(EventHandler<? super T>... handlers)

        Set up event handlers to handle events from the ring buffer. These handlers will process events as soon as they become available, in parallel.

        This method can be used as the start of a chain. For example if the handler A must process events before handler B:

        dw.handleEventsWith(A).then(B);

        This call is additive, but generally should only be called once when setting up the Disruptor instance

        Parameters:
        handlers - the event handlers that will process events.
        Returns:
        a EventHandlerGroup that can be used to chain dependencies.
      • handleEventsWith

        @SafeVarargs
        public final EventHandlerGroup<T> handleEventsWith​(BatchRewindStrategy batchRewindStrategy,
                                                           RewindableEventHandler<? super T>... handlers)

        Set up event handlers to handle events from the ring buffer. These handlers will process events as soon as they become available, in parallel.

        This method can be used as the start of a chain. For example if the handler A must process events before handler B:

        dw.handleEventsWith(A).then(B);

        This call is additive, but generally should only be called once when setting up the Disruptor instance

        Parameters:
        batchRewindStrategy - a BatchRewindStrategy for customizing how to handle a RewindableException.
        handlers - the rewindable event handlers that will process events.
        Returns:
        a EventHandlerGroup that can be used to chain dependencies.
      • handleEventsWith

        @SafeVarargs
        public final EventHandlerGroup<T> handleEventsWith​(EventProcessorFactory<T>... eventProcessorFactories)

        Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start these processors when start() is called.

        This method can be used as the start of a chain. For example if the handler A must process events before handler B:

        dw.handleEventsWith(A).then(B);

        Since this is the start of the chain, the processor factories will always be passed an empty Sequence array, so the factory isn't necessary in this case. This method is provided for consistency with EventHandlerGroup.handleEventsWith(EventProcessorFactory...) and EventHandlerGroup.then(EventProcessorFactory...) which do have barrier sequences to provide.

        This call is additive, but generally should only be called once when setting up the Disruptor instance

        Parameters:
        eventProcessorFactories - the event processor factories to use to create the event processors that will process events.
        Returns:
        a EventHandlerGroup that can be used to chain dependencies.
      • handleEventsWith

        public EventHandlerGroup<T> handleEventsWith​(EventProcessor... processors)

        Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start this processors when start() is called.

        This method can be used as the start of a chain. For example if the processor A must process events before handler B:

        dw.handleEventsWith(A).then(B);
        Parameters:
        processors - the event processors that will process events.
        Returns:
        a EventHandlerGroup that can be used to chain dependencies.
      • handleExceptionsWith

        @Deprecated
        public void handleExceptionsWith​(ExceptionHandler<? super T> exceptionHandler)
        Deprecated.
        This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.

        Specify an exception handler to be used for any future event handlers.

        Note that only event handlers set up after calling this method will use the exception handler.

        Parameters:
        exceptionHandler - the exception handler to use for any future EventProcessor.
      • setDefaultExceptionHandler

        public void setDefaultExceptionHandler​(ExceptionHandler<? super T> exceptionHandler)

        Specify an exception handler to be used for event handlers and worker pools created by this Disruptor.

        The exception handler will be used by existing and future event handlers and worker pools created by this Disruptor instance.

        Parameters:
        exceptionHandler - the exception handler to use.
      • handleExceptionsFor

        public ExceptionHandlerSetting<T> handleExceptionsFor​(EventHandlerIdentity eventHandler)
        Override the default exception handler for a specific handler.
        disruptorWizard.handleExceptionsIn(eventHandler).with(exceptionHandler);
        Parameters:
        eventHandler - the event handler to set a different exception handler for.
        Returns:
        an ExceptionHandlerSetting dsl object - intended to be used by chaining the with method call.
      • after

        public final EventHandlerGroup<T> after​(EventHandlerIdentity... handlers)

        Create a group of event handlers to be used as a dependency. For example if the handler A must process events before handler B:

        dw.after(A).handleEventsWith(B);
        Parameters:
        handlers - the event handlers, previously set up with handleEventsWith(EventHandler[]), that will form the barrier for subsequent handlers or processors.
        Returns:
        an EventHandlerGroup that can be used to setup a dependency barrier over the specified event handlers.
      • publishEvent

        public void publishEvent​(EventTranslator<T> eventTranslator)
        Publish an event to the ring buffer.
        Parameters:
        eventTranslator - the translator that will load data into the event.
      • publishEvent

        public <A> void publishEvent​(EventTranslatorOneArg<T,​A> eventTranslator,
                                     A arg)
        Publish an event to the ring buffer.
        Type Parameters:
        A - Class of the user supplied argument.
        Parameters:
        eventTranslator - the translator that will load data into the event.
        arg - A single argument to load into the event
      • publishEvents

        public <A> void publishEvents​(EventTranslatorOneArg<T,​A> eventTranslator,
                                      A[] arg)
        Publish a batch of events to the ring buffer.
        Type Parameters:
        A - Class of the user supplied argument.
        Parameters:
        eventTranslator - the translator that will load data into the event.
        arg - An array single arguments to load into the events. One Per event.
      • publishEvent

        public <A,​B> void publishEvent​(EventTranslatorTwoArg<T,​A,​B> eventTranslator,
                                             A arg0,
                                             B arg1)
        Publish an event to the ring buffer.
        Type Parameters:
        A - Class of the user supplied argument.
        B - Class of the user supplied argument.
        Parameters:
        eventTranslator - the translator that will load data into the event.
        arg0 - The first argument to load into the event
        arg1 - The second argument to load into the event
      • publishEvent

        public <A,​B,​C> void publishEvent​(EventTranslatorThreeArg<T,​A,​B,​C> eventTranslator,
                                                     A arg0,
                                                     B arg1,
                                                     C arg2)
        Publish an event to the ring buffer.
        Type Parameters:
        A - Class of the user supplied argument.
        B - Class of the user supplied argument.
        C - Class of the user supplied argument.
        Parameters:
        eventTranslator - the translator that will load data into the event.
        arg0 - The first argument to load into the event
        arg1 - The second argument to load into the event
        arg2 - The third argument to load into the event
      • start

        public RingBuffer<T> start()

        Starts the event processors and returns the fully configured ring buffer.

        The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.

        This method must only be called once after all event processors have been added.

        Returns:
        the configured ring buffer.
      • halt

        public void halt()
        Calls EventProcessor.halt() on all of the event processors created via this disruptor.
      • shutdown

        public void shutdown()

        Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors. It is critical that publishing to the ring buffer has stopped before calling this method, otherwise it may never return.

        This method will not shutdown the executor, nor will it await the final termination of the processor threads.

      • shutdown

        public void shutdown​(long timeout,
                             java.util.concurrent.TimeUnit timeUnit)
                      throws TimeoutException

        Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors.

        This method will not shutdown the executor, nor will it await the final termination of the processor threads.

        Parameters:
        timeout - the amount of time to wait for all events to be processed. -1 will give an infinite timeout
        timeUnit - the unit the timeOut is specified in
        Throws:
        TimeoutException - if a timeout occurs before shutdown completes.
      • getRingBuffer

        public RingBuffer<T> getRingBuffer()
        The RingBuffer used by this Disruptor. This is useful for creating custom event processors if the behaviour of BatchEventProcessor is not suitable.
        Returns:
        the ring buffer used by this Disruptor.
      • getCursor

        public long getCursor()
        Get the value of the cursor indicating the published sequence.
        Returns:
        value of the cursor for events that have been published.
      • getBufferSize

        public long getBufferSize()
        The capacity of the data structure to hold entries.
        Returns:
        the size of the RingBuffer.
        See Also:
        Sequenced.getBufferSize()
      • get

        public T get​(long sequence)
        Get the event for a given sequence in the RingBuffer.
        Parameters:
        sequence - for the event.
        Returns:
        event for the sequence.
        See Also:
        RingBuffer.get(long)
      • getSequenceValueFor

        public long getSequenceValueFor​(EventHandlerIdentity handler)
        Gets the sequence value for the specified event handlers.
        Parameters:
        handler - eventHandler to get the sequence for.
        Returns:
        eventHandler's sequence
      • hasStarted

        public boolean hasStarted()
        Checks if disruptor has been started
        Returns:
        true when start has been called on this instance; otherwise false
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object