CatapultServer  v0.5.0.1 (Elephant)
ConsumerDispatcher.h
Go to the documentation of this file.
1 
21 #pragma once
23 #include "Disruptor.h"
24 #include "DisruptorConsumer.h"
25 #include "DisruptorInspector.h"
27 #include <boost/thread.hpp>
28 #include <atomic>
29 
30 namespace catapult { namespace disruptor { class ConsumerEntry; } }
31 
32 namespace catapult { namespace disruptor {
33 
36  public:
41  const ConsumerDispatcherOptions& options,
42  const std::vector<DisruptorConsumer>& consumers,
43  const DisruptorInspector& inspector);
44 
46  explicit ConsumerDispatcher(const ConsumerDispatcherOptions& options, const std::vector<DisruptorConsumer>& consumers);
47 
49 
50  public:
52  void shutdown();
53 
55  bool isRunning() const;
56 
58  size_t size() const;
59 
62  DisruptorElementId processElement(ConsumerInput&& input, const ProcessingCompleteFunc& processingComplete);
63 
66 
68  size_t numAddedElements() const;
69 
71  size_t numActiveElements() const;
72 
73  private:
74  DisruptorElement* tryNext(ConsumerEntry& consumerEntry);
75 
76  void advance(ConsumerEntry& consumerEntry);
77 
78  bool canProcessNextElement() const;
79 
80  ProcessingCompleteFunc wrap(const ProcessingCompleteFunc& processingComplete);
81 
82  private:
85  std::atomic_bool m_keepRunning;
89  boost::thread_group m_threads;
90  std::atomic<size_t> m_numActiveElements;
91 
92  utils::SpinLock m_addSpinLock; // lock to serialize access to Disruptor::add
93  };
94 }}
ConsumerEntry.h
catapult::disruptor::ConsumerDispatcher::m_threads
boost::thread_group m_threads
Definition: ConsumerDispatcher.h:89
catapult::disruptor::ConsumerInput
Consumer input composed of a range of entities augmented with metadata.
Definition: ConsumerInput.h:30
catapult::disruptor::ConsumerDispatcher::ConsumerDispatcher
ConsumerDispatcher(const ConsumerDispatcherOptions &options, const std::vector< DisruptorConsumer > &consumers, const DisruptorInspector &inspector)
Definition: ConsumerDispatcher.cpp:53
CATAPULT_LOG
#define CATAPULT_LOG(SEV)
Writes a log entry to the default logger with SEV severity.
Definition: Logging.h:340
catapult::disruptor::DisruptorBarriers
Container for disruptor barriers.
Definition: DisruptorBarriers.h:30
ThreadInfo.h
catapult::disruptor::DisruptorBarriers::size
size_t size() const
Returns number of barriers.
Definition: DisruptorBarriers.h:37
catapult::disruptor::ConsumerDispatcher::advance
void advance(ConsumerEntry &consumerEntry)
Definition: ConsumerDispatcher.cpp:128
catapult::disruptor::ConsumerEntry::position
PositionType position() const
Returns current position (in the circular buffer).
Definition: ConsumerEntry.h:42
catapult::disruptor::ConsumerDispatcher::m_disruptor
Disruptor m_disruptor
Definition: ConsumerDispatcher.h:87
catapult::disruptor::ConsumerDispatcher::m_elementTraceInterval
size_t m_elementTraceInterval
Definition: ConsumerDispatcher.h:83
catapult::disruptor::ConsumerDispatcher::isRunning
bool isRunning() const
Returns true if dispatcher is running, false otherwise.
Definition: ConsumerDispatcher.cpp:98
catapult::disruptor::ConsumerDispatcher::m_numActiveElements
std::atomic< size_t > m_numActiveElements
Definition: ConsumerDispatcher.h:90
catapult::utils::NamedObjectMixin
Mixin to have named objects.
Definition: NamedObject.h:28
catapult::disruptor::ConsumerDispatcher::numActiveElements
size_t numActiveElements() const
Returns the number of elements currently in the disruptor.
Definition: ConsumerDispatcher.cpp:110
ConsumerDispatcher.h
colorPrint.warning
def warning(*args)
Definition: colorPrint.py:10
catapult::disruptor::ConsumerDispatcher::m_inspector
DisruptorInspector m_inspector
Definition: ConsumerDispatcher.h:88
catapult::disruptor::Disruptor::elementAt
DisruptorElement & elementAt(PositionType position)
Gets element at given position.
Definition: Disruptor.h:51
catapult::disruptor::ConsumerDispatcher
Dispatcher for disruptor consumers.
Definition: ConsumerDispatcher.h:35
catapult::disruptor::ConsumerEntry
Holds information about a consumer.
Definition: ConsumerEntry.h:27
catapult::disruptor::Disruptor::isSkipped
bool isSkipped(PositionType position) const
Checks skip flag on the element at position.
Definition: Disruptor.cpp:52
catapult::disruptor::ConsumerDispatcher::~ConsumerDispatcher
~ConsumerDispatcher()
Definition: ConsumerDispatcher.cpp:89
catapult::disruptor::Disruptor::added
uint64_t added() const
Gets the number of total elements added to the disruptor.
Definition: Disruptor.h:66
catapult::disruptor::CompletionStatus::Aborted
Processing of the entity was aborted by a consumer.
catapult::disruptor::DisruptorElement
Augments consumer input with disruptor metadata.
Definition: DisruptorElement.h:28
catapult::utils::SpinLock
Definition: SpinLock.h:31
catapult::disruptor::ConsumerDispatcherOptions::DispatcherName
const char * DispatcherName
Name of the dispatcher.
Definition: ConsumerDispatcherOptions.h:39
ConsumerDispatcherOptions.h
Disruptor.h
DisruptorConsumer.h
catapult::disruptor::ConsumerDispatcherOptions
Consumer dispatcher options.
Definition: ConsumerDispatcherOptions.h:27
DisruptorInspector.h
catapult::disruptor::ConsumerDispatcher::numAddedElements
size_t numAddedElements() const
Returns the total number of elements added to the disruptor.
Definition: ConsumerDispatcher.cpp:106
catapult::disruptor::DisruptorElementId
uint64_t DisruptorElementId
Id of a disruptor element.
Definition: DisruptorTypes.h:32
catapult::disruptor::DisruptorInspector
consumer< ConsumerInput &, const ConsumerCompletionResult & > DisruptorInspector
A disruptor inspector function.
Definition: DisruptorInspector.h:28
Functional.h
catapult::disruptor::ConsumerDispatcher::m_keepRunning
std::atomic_bool m_keepRunning
Definition: ConsumerDispatcher.h:85
catapult::thread::SetThreadName
void SetThreadName(const std::string &name)
Definition: ThreadInfo.cpp:69
catapult::disruptor::ConsumerDispatcher::tryNext
DisruptorElement * tryNext(ConsumerEntry &consumerEntry)
Definition: ConsumerDispatcher.cpp:114
catapult::disruptor::ConsumerDispatcher::canProcessNextElement
bool canProcessNextElement() const
Definition: ConsumerDispatcher.cpp:143
forwardsValidation.info
def info(*args)
Definition: forwardsValidation.py:12
NamedObject.h
size
uint64_t size
Definition: MemoryCounters.cpp:65
catapult::disruptor::ProcessingCompleteFunc
consumer< DisruptorElementId, const ConsumerCompletionResult & > ProcessingCompleteFunc
Function signature for signaling that processing finished.
Definition: DisruptorTypes.h:117
catapult::disruptor::ConsumerDispatcher::m_shouldThrowIfFull
bool m_shouldThrowIfFull
Definition: ConsumerDispatcher.h:84
catapult::disruptor::ConsumerEntry::level
size_t level() const
Returns consumer level.
Definition: ConsumerEntry.h:47
catapult::disruptor::ConsumerDispatcher::processElement
DisruptorElementId processElement(ConsumerInput &&input, const ProcessingCompleteFunc &processingComplete)
Definition: ConsumerDispatcher.cpp:163
catapult::disruptor::ConsumerDispatcher::m_addSpinLock
utils::SpinLock m_addSpinLock
Definition: ConsumerDispatcher.h:92
catapult::disruptor::Disruptor
Disruptor wraps around CircularBuffer for usage within Consumer Dispatcher.
Definition: Disruptor.h:33
CATAPULT_THROW_RUNTIME_ERROR
#define CATAPULT_THROW_RUNTIME_ERROR(MESSAGE)
Macro used to throw a catapult runtime error.
Definition: exceptions.h:167
catapult::disruptor::Disruptor::add
DisruptorElementId add(ConsumerInput &&input, const ProcessingCompleteFunc &processingComplete)
Definition: Disruptor.cpp:39
catapult::disruptor::ConsumerEntry::advance
PositionType advance()
Advances the position.
Definition: ConsumerEntry.h:36
CATAPULT_THROW_INVALID_ARGUMENT
#define CATAPULT_THROW_INVALID_ARGUMENT(MESSAGE)
Macro used to throw a catapult invalid argument.
Definition: exceptions.h:179
catapult
Definition: AddressExtractionExtension.cpp:28
catapult::disruptor::ConsumerDispatcher::wrap
ProcessingCompleteFunc wrap(const ProcessingCompleteFunc &processingComplete)
Definition: ConsumerDispatcher.cpp:156
catapult::disruptor::ConsumerDispatcher::shutdown
void shutdown()
Shuts down dispatcher and stops all threads.
Definition: ConsumerDispatcher.cpp:93
catapult::disruptor::IsIntervalElementId
constexpr bool IsIntervalElementId(DisruptorElementId id, size_t interval)
Returns true if id matches interval.
Definition: DisruptorElement.h:99
catapult::disruptor::ConsumerDispatcher::m_barriers
DisruptorBarriers m_barriers
Definition: ConsumerDispatcher.h:86
catapult::consumer
std::function< void(TArgs...)> consumer
A consumer function.
Definition: functions.h:35
catapult::utils::SpinLockGuard
std::lock_guard< SpinLock > SpinLockGuard
A spin lock guard.
Definition: SpinLock.h:60
catapult::disruptor::Disruptor::capacity
size_t capacity() const
Gets the capacity of the disruptor.
Definition: Disruptor.h:61
catapult::disruptor::ConsumerDispatcher::size
size_t size() const
Returns the number of registered consumers.
Definition: ConsumerDispatcher.cpp:102