CatapultServer  v0.5.0.1 (Elephant)
FutureUtils.h
Go to the documentation of this file.
1 
21 #pragma once
22 #include "Future.h"
23 #include "catapult/exceptions.h"
24 #include <atomic>
25 #include <vector>
26 
27 namespace catapult { namespace thread {
28 
30  template<typename T>
31  future<std::vector<future<T>>> when_all(std::vector<future<T>>&& allFutures) {
32 // workaround gcc bug by explicitly specifying inner struct visibility when inner struct contains lambda
33 // https://www.mail-archive.com/gcc-bugs@gcc.gnu.org/msg534746.html
34 #ifdef __GNUC__
35 #define INNER_STRUCT_VISIBILILTY __attribute__ ((visibility ("hidden")))
36 #else
37 #define INNER_STRUCT_VISIBILILTY
38 #endif
39 
40  using FutureType = future<T>;
41  using JointPromiseType = promise<std::vector<FutureType>>;
42 
43  struct INNER_STRUCT_VISIBILILTY ContinuationContext : public std::enable_shared_from_this<ContinuationContext> {
44  public:
45  explicit ContinuationContext(size_t numFutures) : m_futures(numFutures), m_counter(0)
46  {}
47 
48  public:
49  auto future() {
50  return m_jointPromise.get_future();
51  }
52 
53  void setContinuation(FutureType& future, size_t index) {
54  auto pThis = this->shared_from_this();
55  future.then([pThis, index](auto&& continuationFuture) {
56  auto& futures = pThis->m_futures;
57  futures[index] = std::move(continuationFuture);
58  if (futures.size() != ++pThis->m_counter)
59  return;
60 
61  pThis->m_jointPromise.set_value(std::move(futures));
62  });
63  }
64 
65  private:
66  std::vector<FutureType> m_futures;
67  JointPromiseType m_jointPromise;
68  std::atomic<size_t> m_counter;
69  };
70 
71  if (allFutures.empty())
72  CATAPULT_THROW_INVALID_ARGUMENT("when_all cannot join zero futures");
73 
74  auto i = 0u;
75  auto pContext = std::make_shared<ContinuationContext>(allFutures.size());
76  for (auto& future : allFutures)
77  pContext->setContinuation(future, i++);
78 
79  return pContext->future();
80 
81 #undef INNER_STRUCT_VISIBILILTY
82  }
83 
85  template<typename T>
87  std::vector<future<T>> futures;
88  futures.push_back(std::move(future1));
89  futures.push_back(std::move(future2));
90  return when_all(std::move(futures));
91  }
92 
97  template<
98  typename TSeed,
99  typename TCreateNextFuture,
100  typename TResultFuture = std::invoke_result_t<TCreateNextFuture, future<TSeed>&&>,
101  typename TResultType = decltype(TResultFuture().get())>
102  auto compose(future<TSeed>&& startFuture, TCreateNextFuture createNextFuture) {
103  auto pComposePromise = std::make_shared<promise<TResultType>>();
104  startFuture.then([createNextFuture, pComposePromise](auto&& completedFirstFuture) {
105  try {
106  auto secondFuture = createNextFuture(std::move(completedFirstFuture));
107  secondFuture.then([pComposePromise](auto&& completedSecondFuture) {
108  try {
109  pComposePromise->set_value(completedSecondFuture.get());
110  } catch (...) {
111  pComposePromise->set_exception(std::current_exception());
112  }
113  });
114  } catch (...) {
115  pComposePromise->set_exception(std::current_exception());
116  }
117  });
118 
119  return pComposePromise->get_future();
120  }
121 
124  template<typename T>
125  std::vector<T> get_all_ignore_exceptional(std::vector<future<T>>&& futures) {
126  std::vector<T> results;
127  for (auto& future : futures) {
128  try {
129  results.push_back(future.get());
130  } catch (...) {
131  // suppress
132  }
133  }
134 
135  return results;
136  }
137 
140  template<typename T>
141  std::vector<T> get_all(std::vector<future<T>>&& futures) {
142  std::vector<T> results;
143  for (auto& future : futures)
144  results.push_back(future.get());
145 
146  return results;
147  }
148 }}
exceptions.h
catapult::thread::future::get
T get()
Returns the result of this future and blocks until the result is available.
Definition: Future.h:50
catapult::thread::future::then
auto then(TContinuation continuation)
Configures continuation to run at the completion of this future.
Definition: Future.h:59
catapult::thread::compose
auto compose(future< TSeed > &&startFuture, TCreateNextFuture createNextFuture)
Definition: FutureUtils.h:102
catapult::thread::get_all_ignore_exceptional
std::vector< T > get_all_ignore_exceptional(std::vector< future< T >> &&futures)
Definition: FutureUtils.h:125
catapult::thread::when_all
future< std::vector< future< T > > > when_all(std::vector< future< T >> &&allFutures)
Returns a future that is signaled when all futures in allFutures complete.
Definition: FutureUtils.h:31
catapult::thread::future
Provides a way to access the result of an asynchronous operation.
Definition: Future.h:29
INNER_STRUCT_VISIBILILTY
#define INNER_STRUCT_VISIBILILTY
CATAPULT_THROW_INVALID_ARGUMENT
#define CATAPULT_THROW_INVALID_ARGUMENT(MESSAGE)
Macro used to throw a catapult invalid argument.
Definition: exceptions.h:179
catapult
Definition: AddressExtractionExtension.cpp:28
Future.h
catapult::thread::get_all
std::vector< T > get_all(std::vector< future< T >> &&futures)
Definition: FutureUtils.h:141
catapult::thread::promise
Stores the result of an asynchronous operation.
Definition: Future.h:85