CatapultServer  v0.5.0.1 (Elephant)
MultiServicePool.h
Go to the documentation of this file.
1 
21 #pragma once
22 #include "IoThreadPool.h"
23 #include "catapult/utils/Logging.h"
24 #include "catapult/functions.h"
25 #include <memory>
26 #include <thread>
27 #include <vector>
28 
29 namespace catapult { namespace thread {
30 
34  public:
36  enum class IsolatedPoolMode {
38  Enabled,
39 
41  Disabled
42  };
43 
45  static constexpr size_t DefaultPoolConcurrency() {
46  return 0;
47  }
48 
49  public:
50  // region ServiceGroup
51 
56  class ServiceGroup {
57  public:
59  ServiceGroup(const std::shared_ptr<thread::IoThreadPool>& pPool, const std::string& name)
60  : m_pPool(pPool)
61  , m_name(name)
62  {}
63 
64  public:
66  size_t numServices() const {
67  return m_shutdownFunctions.size();
68  }
69 
70  public:
72  template<typename TService>
73  auto registerService(const std::shared_ptr<TService>& pService) {
74  m_services.push_back(pService);
75 
76  auto serviceName = m_name + " service " + std::to_string(m_shutdownFunctions.size() + 1);
77  m_shutdownFunctions.push_back([pService, serviceName]() {
78  // shutdown the service
79  CATAPULT_LOG(info) << "shutting down " << serviceName;
80  pService->shutdown();
81  });
82 
83  CATAPULT_LOG(debug) << "registered " << serviceName;
84  return pService;
85  }
86 
88  template<typename TFactory, typename... TArgs>
89  auto pushService(TFactory factory, TArgs&&... args) {
90  // create a service around the pool
91  auto pService = factory(m_pPool, std::forward<TArgs>(args)...);
92  return registerService(pService);
93  }
94 
96  void shutdown() {
97  // 1. shutdown the services
98  for (auto iter = m_shutdownFunctions.rbegin(); m_shutdownFunctions.rend() != iter; ++iter)
99  (*iter)();
100 
101  m_shutdownFunctions.clear();
102 
103  // 2. wait for all service references to be released
104  for (const auto& pService : m_services)
105  WaitForLastReference(pService);
106  }
107 
108  private:
109  std::shared_ptr<thread::IoThreadPool> m_pPool;
110  std::string m_name;
111  std::vector<std::shared_ptr<void>> m_services;
112  std::vector<action> m_shutdownFunctions;
113  };
114 
115  // endregion
116 
117  public:
121  MultiServicePool(const std::string& name, size_t numWorkerThreads, IsolatedPoolMode isolatedPoolMode = IsolatedPoolMode::Enabled)
122  : m_name(name)
123  , m_isolatedPoolMode(isolatedPoolMode)
125  , m_numServiceGroups(0)
127  {}
128 
131  shutdown();
132  }
133 
134  public:
136  size_t numWorkerThreads() const {
137  return (m_pPool ? m_pPool->numWorkerThreads() : 0) + m_numTotalIsolatedPoolThreads;
138  }
139 
141  size_t numServiceGroups() const {
142  return m_numServiceGroups;
143  }
144 
147  size_t numServices() const {
148  // don't include the service groups in the count
150  for (const auto& pGroup : m_serviceGroups)
151  numServices += pGroup->numServices();
152 
153  return numServices;
154  }
155 
156  private:
157  template<typename TService>
158  auto registerService(const std::shared_ptr<TService>& pService, const std::string& serviceName) {
159  m_shutdownFunctions.push_back([pService, serviceName]() {
160  CATAPULT_LOG(info) << "shutting down " << serviceName;
161 
162  // shutdown the service and wait for all callbacks to complete
163  pService->shutdown();
164  WaitForLastReference(pService);
165  });
166 
167  return pService;
168  }
169 
170  public:
172  std::shared_ptr<ServiceGroup> pushServiceGroup(const std::string& name) {
173  auto pGroup = std::make_shared<ServiceGroup>(m_pPool, m_name + "::" + name);
174  m_serviceGroups.push_back(pGroup);
175  registerService(pGroup, name + " (service group)");
177  return pGroup;
178  }
179 
181  std::shared_ptr<thread::IoThreadPool> pushIsolatedPool(const std::string& name) {
183  }
184 
187  std::shared_ptr<thread::IoThreadPool> pushIsolatedPool(const std::string& name, size_t numWorkerThreads) {
188  class PoolServiceAdapter {
189  public:
190  explicit PoolServiceAdapter(const std::shared_ptr<thread::IoThreadPool>& pPool) : m_pPool(pPool)
191  {}
192 
193  public:
194  void shutdown() {
196  m_pPool->join();
197  }
198 
199  private:
200  std::shared_ptr<thread::IoThreadPool> m_pPool;
201  };
202 
203  // when isolated pool mode is disabled, use the main pool for everything
205  return m_pPool;
206 
207  auto pPool = CreateThreadPool(numWorkerThreads, name);
208  registerService(std::make_shared<PoolServiceAdapter>(pPool), name + " (isolated pool)");
209  m_numTotalIsolatedPoolThreads += pPool->numWorkerThreads();
210  return pPool;
211  }
212 
213  public:
215  void shutdown() {
216  // if the pool has already been destroyed, don't do anything
217  if (!m_pPool)
218  return;
219 
220  // 1. clear the dependent entities
221  m_serviceGroups.clear();
222 
223  // 2. shutdown the services
224  for (auto iter = m_shutdownFunctions.rbegin(); m_shutdownFunctions.rend() != iter; ++iter)
225  (*iter)();
226 
227  m_shutdownFunctions.clear();
229  m_numServiceGroups = 0;
230 
231  // 3. after the services are destroyed, the thread pool can be safely destroyed
233  m_pPool.reset();
234  }
235 
236  private:
237  static std::shared_ptr<thread::IoThreadPool> CreateThreadPool(size_t numWorkerThreads, const std::string& name) {
238  numWorkerThreads = DefaultPoolConcurrency() == numWorkerThreads ? std::thread::hardware_concurrency() : numWorkerThreads;
239  auto pPool = thread::CreateIoThreadPool(numWorkerThreads, name.c_str());
240  pPool->start();
241  return std::move(pPool);
242  }
243 
244  template<typename T>
245  static void WaitForLastReference(const std::shared_ptr<T>& pVoid) {
246  volatile long useCount;
247  while (1 < (useCount = pVoid.use_count()))
248  std::this_thread::yield();
249  }
250 
251  private:
252  std::string m_name;
256  std::shared_ptr<thread::IoThreadPool> m_pPool;
257  std::vector<std::shared_ptr<ServiceGroup>> m_serviceGroups;
258  std::vector<action> m_shutdownFunctions;
259  };
260 }}
catapult::thread::MultiServicePool::m_numTotalIsolatedPoolThreads
size_t m_numTotalIsolatedPoolThreads
Definition: MultiServicePool.h:254
catapult::thread::MultiServicePool::ServiceGroup::m_shutdownFunctions
std::vector< action > m_shutdownFunctions
Definition: MultiServicePool.h:112
CATAPULT_LOG
#define CATAPULT_LOG(SEV)
Writes a log entry to the default logger with SEV severity.
Definition: Logging.h:340
catapult::thread::MultiServicePool::CreateThreadPool
static std::shared_ptr< thread::IoThreadPool > CreateThreadPool(size_t numWorkerThreads, const std::string &name)
Definition: MultiServicePool.h:237
catapult::thread::MultiServicePool::m_shutdownFunctions
std::vector< action > m_shutdownFunctions
Definition: MultiServicePool.h:258
Parser.debug
def debug(*args)
Definition: Parser.py:46
catapult::thread::MultiServicePool::pushServiceGroup
std::shared_ptr< ServiceGroup > pushServiceGroup(const std::string &name)
Creates a new service group with name.
Definition: MultiServicePool.h:172
catapult::thread::MultiServicePool::numWorkerThreads
size_t numWorkerThreads() const
Gets the number of active worker threads.
Definition: MultiServicePool.h:136
catapult::thread::MultiServicePool::ServiceGroup::pushService
auto pushService(TFactory factory, TArgs &&... args)
Creates a new service by calling factory with args service arguments.
Definition: MultiServicePool.h:89
catapult::thread::MultiServicePool::ServiceGroup::registerService
auto registerService(const std::shared_ptr< TService > &pService)
Registers an externally created service (pService) for cleanup.
Definition: MultiServicePool.h:73
catapult::thread::MultiServicePool::DefaultPoolConcurrency
static constexpr size_t DefaultPoolConcurrency()
A default pool concurrency level based on the local hardware configuration.
Definition: MultiServicePool.h:45
catapult::thread::MultiServicePool::m_pPool
std::shared_ptr< thread::IoThreadPool > m_pPool
Definition: MultiServicePool.h:256
catapult::thread::MultiServicePool::ServiceGroup::shutdown
void shutdown()
Safely shuts down the service group.
Definition: MultiServicePool.h:96
catapult::thread::MultiServicePool::ServiceGroup::m_services
std::vector< std::shared_ptr< void > > m_services
Definition: MultiServicePool.h:111
catapult::thread::MultiServicePool
Definition: MultiServicePool.h:33
functions.h
catapult::thread::MultiServicePool::pushIsolatedPool
std::shared_ptr< thread::IoThreadPool > pushIsolatedPool(const std::string &name)
Creates a new isolated thread pool with a default number of threads and name.
Definition: MultiServicePool.h:181
catapult::thread::MultiServicePool::ServiceGroup::m_pPool
std::shared_ptr< thread::IoThreadPool > m_pPool
Definition: MultiServicePool.h:109
catapult::thread::MultiServicePool::IsolatedPoolMode::Enabled
Sub pool isolation is enabled.
catapult::thread::CreateIoThreadPool
std::unique_ptr< IoThreadPool > CreateIoThreadPool(size_t numWorkerThreads, const char *name)
Definition: IoThreadPool.cpp:146
catapult::thread::MultiServicePool::shutdown
void shutdown()
Safely shuts down the thread pool and its dependent services.
Definition: MultiServicePool.h:215
catapult::thread::MultiServicePool::pushIsolatedPool
std::shared_ptr< thread::IoThreadPool > pushIsolatedPool(const std::string &name, size_t numWorkerThreads)
Definition: MultiServicePool.h:187
catapult::thread::MultiServicePool::ServiceGroup::ServiceGroup
ServiceGroup(const std::shared_ptr< thread::IoThreadPool > &pPool, const std::string &name)
Creates a group around pPool and name.
Definition: MultiServicePool.h:59
catapult::thread::MultiServicePool::ServiceGroup
Definition: MultiServicePool.h:56
catapult::thread::MultiServicePool::ServiceGroup::numServices
size_t numServices() const
Gets the number of services.
Definition: MultiServicePool.h:66
catapult::thread::MultiServicePool::~MultiServicePool
~MultiServicePool()
Destroys the pool.
Definition: MultiServicePool.h:130
catapult::thread::MultiServicePool::registerService
auto registerService(const std::shared_ptr< TService > &pService, const std::string &serviceName)
Definition: MultiServicePool.h:158
IoThreadPool.h
forwardsValidation.info
def info(*args)
Definition: forwardsValidation.py:12
catapult::thread::MultiServicePool::numServiceGroups
size_t numServiceGroups() const
Gets the number of service groups.
Definition: MultiServicePool.h:141
catapult::thread::MultiServicePool::IsolatedPoolMode
IsolatedPoolMode
Isolated pool mode.
Definition: MultiServicePool.h:36
catapult::thread::MultiServicePool::ServiceGroup::m_name
std::string m_name
Definition: MultiServicePool.h:110
catapult::thread::MultiServicePool::m_numServiceGroups
size_t m_numServiceGroups
Definition: MultiServicePool.h:255
catapult::thread::MultiServicePool::MultiServicePool
MultiServicePool(const std::string &name, size_t numWorkerThreads, IsolatedPoolMode isolatedPoolMode=IsolatedPoolMode::Enabled)
Definition: MultiServicePool.h:121
catapult
Definition: AddressExtractionExtension.cpp:28
catapult::thread::MultiServicePool::WaitForLastReference
static void WaitForLastReference(const std::shared_ptr< T > &pVoid)
Definition: MultiServicePool.h:245
catapult::thread::MultiServicePool::IsolatedPoolMode::Disabled
Sub pool isolation is disabled.
Logging.h
catapult::thread::MultiServicePool::numServices
size_t numServices() const
Definition: MultiServicePool.h:147
catapult::thread::MultiServicePool::m_serviceGroups
std::vector< std::shared_ptr< ServiceGroup > > m_serviceGroups
Definition: MultiServicePool.h:257
catapult::thread::MultiServicePool::m_name
std::string m_name
Definition: MultiServicePool.h:252
catapult::thread::MultiServicePool::m_isolatedPoolMode
IsolatedPoolMode m_isolatedPoolMode
Definition: MultiServicePool.h:253