23 #include <boost/asio.hpp>
25 namespace catapult {
namespace thread {
29 template<
typename TItems,
typename TWorkCallback>
31 boost::asio::io_context& ioContext,
34 TWorkCallback callback) {
36 class ParallelContext {
47 void incrementOutstandingOperations() {
51 void decrementOutstandingOperations() {
67 class DecrementGuard {
69 explicit DecrementGuard(ParallelContext& context) :
m_context(context)
73 m_context.decrementOutstandingOperations();
82 auto pParallelContext = std::make_shared<ParallelContext>();
83 DecrementGuard mainOperationGuard(*pParallelContext);
85 auto numRemainingPartitions = numPartitions;
86 auto numTotalItems = items.size();
87 auto numRemainingItems = numTotalItems;
88 auto itBegin = items.begin();
89 while (numRemainingItems > 0) {
93 auto isDivisible = 0 == numRemainingItems % numRemainingPartitions;
94 auto size = numRemainingItems / numRemainingPartitions + (isDivisible ? 0 : 1);
96 std::advance(itEnd,
static_cast<typename decltype(itEnd)::difference_type
>(
size));
99 pParallelContext->incrementOutstandingOperations();
100 auto startIndex = numTotalItems - numRemainingItems;
101 auto batchIndex = numPartitions - numRemainingPartitions;
102 boost::asio::post(ioContext, [callback, pParallelContext, itBegin, itEnd, startIndex, batchIndex]() {
103 DecrementGuard threadOperationGuard(*pParallelContext);
104 callback(itBegin, itEnd, startIndex, batchIndex);
107 numRemainingItems -=
size;
108 --numRemainingPartitions;
112 return pParallelContext->future();
117 template<
typename TItems,
typename TWorkCallback>
119 return ParallelForPartition(ioContext, items, numPartitions, [callback](
auto itBegin,
auto itEnd,
auto startIndex,
auto) {
121 for (
auto iter = itBegin; itEnd != iter; ++iter, ++i) {
122 if (!callback(*iter, startIndex + i))