CatapultServer  v0.5.0.1 (Elephant)
ParallelFor.h
Go to the documentation of this file.
1 
21 #pragma once
22 #include "Future.h"
23 #include <boost/asio.hpp>
24 
25 namespace catapult { namespace thread {
26 
29  template<typename TItems, typename TWorkCallback>
31  boost::asio::io_context& ioContext,
32  TItems& items,
33  size_t numPartitions,
34  TWorkCallback callback) {
35  // region ParallelContext
36  class ParallelContext {
37  public:
38  ParallelContext() : m_numOutstandingOperations(1) // note that the work partitioning is the initial operation
39  {}
40 
41  public:
42  auto future() {
43  return m_promise.get_future();
44  }
45 
46  public:
47  void incrementOutstandingOperations() {
49  }
50 
51  void decrementOutstandingOperations() {
52  if (0 != --m_numOutstandingOperations)
53  return;
54 
55  m_promise.set_value(true);
56  }
57 
58  private:
59  std::atomic<size_t> m_numOutstandingOperations;
61  };
62 
63  // endregion
64 
65  // region DecrementGuard
66 
67  class DecrementGuard {
68  public:
69  explicit DecrementGuard(ParallelContext& context) : m_context(context)
70  {}
71 
72  ~DecrementGuard() {
73  m_context.decrementOutstandingOperations();
74  }
75 
76  private:
77  ParallelContext& m_context;
78  };
79 
80  // endregion
81 
82  auto pParallelContext = std::make_shared<ParallelContext>();
83  DecrementGuard mainOperationGuard(*pParallelContext);
84 
85  auto numRemainingPartitions = numPartitions;
86  auto numTotalItems = items.size();
87  auto numRemainingItems = numTotalItems;
88  auto itBegin = items.begin();
89  while (numRemainingItems > 0) {
90  // note: in the case that numRemainingItems is not divisible by numRemainingPartitions,
91  // give the current partition one more item in order to ensure that
92  // the partitions cover all items
93  auto isDivisible = 0 == numRemainingItems % numRemainingPartitions;
94  auto size = numRemainingItems / numRemainingPartitions + (isDivisible ? 0 : 1);
95  auto itEnd = itBegin;
96  std::advance(itEnd, static_cast<typename decltype(itEnd)::difference_type>(size));
97 
98  // each thread captures pParallelContext by value, which keeps that object alive
99  pParallelContext->incrementOutstandingOperations();
100  auto startIndex = numTotalItems - numRemainingItems;
101  auto batchIndex = numPartitions - numRemainingPartitions;
102  boost::asio::post(ioContext, [callback, pParallelContext, itBegin, itEnd, startIndex, batchIndex]() {
103  DecrementGuard threadOperationGuard(*pParallelContext);
104  callback(itBegin, itEnd, startIndex, batchIndex);
105  });
106 
107  numRemainingItems -= size;
108  --numRemainingPartitions;
109  itBegin = itEnd;
110  }
111 
112  return pParallelContext->future();
113  }
114 
117  template<typename TItems, typename TWorkCallback>
118  thread::future<bool> ParallelFor(boost::asio::io_context& ioContext, TItems& items, size_t numPartitions, TWorkCallback callback) {
119  return ParallelForPartition(ioContext, items, numPartitions, [callback](auto itBegin, auto itEnd, auto startIndex, auto) {
120  auto i = 0u;
121  for (auto iter = itBegin; itEnd != iter; ++iter, ++i) {
122  if (!callback(*iter, startIndex + i))
123  break;
124  }
125  });
126  }
127 }}
m_numOutstandingOperations
size_t m_numOutstandingOperations
Definition: SocketReader.cpp:132
catapult::thread::future
Provides a way to access the result of an asynchronous operation.
Definition: Future.h:29
size
uint64_t size
Definition: MemoryCounters.cpp:65
catapult::thread::ParallelForPartition
thread::future< bool > ParallelForPartition(boost::asio::io_context &ioContext, TItems &items, size_t numPartitions, TWorkCallback callback)
Definition: ParallelFor.h:30
m_promise
thread::promise< CompareChainsResult > m_promise
Definition: CompareChains.cpp:166
catapult
Definition: AddressExtractionExtension.cpp:28
Future.h
m_context
MongoStorageContext & m_context
Definition: MongoBlockStorage.cpp:239
catapult::thread::promise
Stores the result of an asynchronous operation.
Definition: Future.h:85
catapult::thread::ParallelFor
thread::future< bool > ParallelFor(boost::asio::io_context &ioContext, TItems &items, size_t numPartitions, TWorkCallback callback)
Definition: ParallelFor.h:118