CatapultServer  v0.5.0.1 (Elephant)
BrokerMessageReaders.h
Go to the documentation of this file.
1 
21 #pragma once
23 #include "catapult/io/FileQueue.h"
25 
26 namespace catapult { namespace subscribers {
27 
28  // region Flusher
29 
30  namespace detail {
31  template<typename TSubscriber, typename = void>
32  struct Flusher {
33  static void Flush(const TSubscriber&)
34  {}
35  };
36 
37  template<typename TSubscriber>
38  struct Flusher<TSubscriber, utils::traits::is_type_expression_t<decltype(reinterpret_cast<TSubscriber*>(0)->flush())>> {
39  static void Flush(TSubscriber& subscriber) {
40  subscriber.flush();
41  }
42  };
43  }
44 
45  // endregion
46 
48  template<typename TSubscriber, typename TMessageReader>
49  void ReadAll(io::InputStream& inputStream, TSubscriber& subscriber, TMessageReader readNextMessage) {
50  while (!inputStream.eof())
51  readNextMessage(inputStream, subscriber);
52 
54  }
55 
57  template<typename TSubscriber, typename TMessageReader>
58  void ReadAll(io::FileQueueReader& reader, TSubscriber& subscriber, TMessageReader readNextMessage) {
59  bool shouldContinue = true;
60  while (shouldContinue) {
61  shouldContinue = reader.tryReadNextMessage([&subscriber, readNextMessage](const auto& buffer) {
63  ReadAll(inputStream, subscriber, readNextMessage);
64  });
65  }
66  }
67 
71  std::string QueuePath;
72 
74  std::string IndexReaderFilename;
75 
77  std::string IndexWriterFilename;
78  };
79 
81  template<typename TSubscriber, typename TMessageReader>
82  void ReadAll(const MessageQueueDescriptor& descriptor, TSubscriber& subscriber, TMessageReader readNextMessage) {
83  io::FileQueueReader reader(descriptor.QueuePath, descriptor.IndexReaderFilename, descriptor.IndexWriterFilename);
84 
85  auto numPendingMessages = reader.pending();
86  if (0 == numPendingMessages)
87  return;
88 
89  CATAPULT_LOG(debug) << "preparing to process " << numPendingMessages << " messages from " << descriptor.QueuePath;
90  subscribers::ReadAll(reader, subscriber, readNextMessage);
91  }
92 }}
CATAPULT_LOG
#define CATAPULT_LOG(SEV)
Writes a log entry to the default logger with SEV severity.
Definition: Logging.h:340
FileQueue.h
Parser.debug
def debug(*args)
Definition: Parser.py:46
catapult::subscribers::ReadAll
void ReadAll(io::InputStream &inputStream, TSubscriber &subscriber, TMessageReader readNextMessage)
Reads all messages from inputStream into subscriber using readNextMessage.
Definition: BrokerMessageReaders.h:49
catapult::subscribers::MessageQueueDescriptor::IndexWriterFilename
std::string IndexWriterFilename
Name of index writer file.
Definition: BrokerMessageReaders.h:77
catapult::subscribers::MessageQueueDescriptor::IndexReaderFilename
std::string IndexReaderFilename
Name of index reader file.
Definition: BrokerMessageReaders.h:74
catapult::io::InputStream
Reader interface.
Definition: Stream.h:27
catapult::io::FileQueueReader
File based queue reader where each message is represented by a file (with incrementing names) in a di...
Definition: FileQueue.h:51
catapult::utils::traits::is_type_expression_t
typename is_type_expression< T, Enable >::type is_type_expression_t
true if the expression is valid and evaluates to a type, false otherwise.
Definition: Traits.h:98
catapult::io::BufferInputStreamAdapter
Adapt a typed buffer to be used as an input stream.
Definition: BufferInputStreamAdapter.h:30
Traits.h
catapult::io::InputStream::eof
virtual bool eof() const =0
Returns true if no data is left in the stream.
catapult::io::FileQueueReader::tryReadNextMessage
bool tryReadNextMessage(const consumer< std::vector< uint8_t >> &consumer)
Tries to read the next message and forwards it to consumer if successful.
Definition: FileQueue.cpp:114
catapult
Definition: AddressExtractionExtension.cpp:28
catapult::subscribers::MessageQueueDescriptor::QueuePath
std::string QueuePath
Path of the message queue.
Definition: BrokerMessageReaders.h:71
catapult::io::FileQueueReader::pending
size_t pending() const
Gets the number of pending messages.
Definition: FileQueue.cpp:108
catapult::subscribers::detail::Flusher
Definition: BrokerMessageReaders.h:32
catapult::subscribers::MessageQueueDescriptor
Describes a message queue.
Definition: BrokerMessageReaders.h:69
BufferInputStreamAdapter.h
catapult::subscribers::detail::Flusher::Flush
static void Flush(const TSubscriber &)
Definition: BrokerMessageReaders.h:33