CatapultServer  v0.5.0.1 (Elephant)
MongoBulkWriter.h
Go to the documentation of this file.
1 
21 #pragma once
22 #include "BulkWriteResult.h"
29 #include "catapult/exceptions.h"
30 #include "catapult/types.h"
31 #include <boost/asio/io_context.hpp>
32 #include <bsoncxx/json.hpp>
33 #include <mongocxx/client.hpp>
34 #include <mongocxx/config/version.hpp>
35 #include <mongocxx/exception/bulk_write_exception.hpp>
36 #include <mongocxx/pool.hpp>
37 #include <unordered_set>
38 
39 namespace catapult { namespace mongo {
40 
43  class MongoBulkWriter final : public std::enable_shared_from_this<MongoBulkWriter> {
44  private:
45  struct BulkWriteParams {
46  public:
47  explicit BulkWriteParams(MongoBulkWriter& mongoBulkWriter, const std::string& collectionName)
48  : pConnection(mongoBulkWriter.m_connectionPool.acquire())
49  , Database(pConnection->database(mongoBulkWriter.m_dbName))
50  , Collection(Database[collectionName])
51  , Bulk(Collection.create_bulk_write())
52  {}
53 
54  public:
55  mongocxx::pool::entry pConnection;
56  mongocxx::database Database;
57  mongocxx::collection Collection;
58  mongocxx::bulk_write Bulk;
59  };
60 
61  using AccountStates = std::unordered_set<std::shared_ptr<const state::AccountState>>;
63 
64  template<typename TEntity>
66 
67  template<typename TEntity>
68  using CreateDocument = std::function<bsoncxx::document::value (const TEntity&, uint32_t)>;
69 
70  template<typename TEntity>
71  using CreateDocuments = std::function<std::vector<bsoncxx::document::value> (const TEntity&, uint32_t)>;
72 
73  template<typename TEntity>
74  using CreateFilter = std::function<bsoncxx::document::value (const TEntity&)>;
75 
76  private:
77  MongoBulkWriter(const mongocxx::uri& uri, const std::string& dbName, const std::shared_ptr<thread::IoThreadPool>& pPool)
78  : m_dbName(dbName)
79  , m_pPool(pPool)
80  , m_ioContext(pPool->ioContext())
81  , m_connectionPool(uri)
82  {}
83 
84  public:
87  static std::shared_ptr<MongoBulkWriter> Create(
88  const mongocxx::uri& uri,
89  const std::string& dbName,
90  const std::shared_ptr<thread::IoThreadPool>& pPool) {
91  // cannot use make_shared with private constructor
92  auto pData = utils::MakeUniqueWithSize<uint8_t>(sizeof(MongoBulkWriter));
93  auto pWriterRaw = new (pData.get()) MongoBulkWriter(uri, dbName, pPool);
94  auto pWriter = std::shared_ptr<MongoBulkWriter>(pWriterRaw);
95  pData.release();
96  return pWriter;
97  }
98 
99  public:
102  template<typename TContainer>
104  const std::string& collectionName,
105  const TContainer& entities,
106  const CreateDocument<typename TContainer::value_type>& createDocument) {
107  auto appendOperation = [createDocument](auto& bulk, const auto& entity, auto index) {
108  auto entityDocument = createDocument(entity, index);
109  bulk.append(mongocxx::model::insert_one(entityDocument.view()));
110  };
111 
112  return bulkWrite<TContainer>(collectionName, entities, appendOperation);
113  }
114 
117  template<typename TContainer>
119  const std::string& collectionName,
120  const TContainer& entities,
121  const CreateDocuments<typename TContainer::value_type>& createDocuments) {
122  auto appendOperation = [createDocuments](auto& bulk, const auto& entity, auto index) {
123  for (const auto& entityDocument : createDocuments(entity, index))
124  bulk.append(mongocxx::model::insert_one(entityDocument.view()));
125  };
126 
127  return bulkWrite<TContainer>(collectionName, entities, appendOperation);
128  }
129 
132  template<typename TContainer>
134  const std::string& collectionName,
135  const TContainer& entities,
137  const CreateFilter<typename TContainer::value_type>& createFilter) {
138  auto appendOperation = [createDocument, createFilter](auto& bulk, const auto& entity, auto index) {
139  auto entityDocument = createDocument(entity, index);
140  auto filter = createFilter(entity);
141  mongocxx::model::replace_one replace_op(filter.view(), entityDocument.view());
142  replace_op.upsert(true);
143  bulk.append(replace_op);
144  };
145 
146  return bulkWrite<TContainer>(collectionName, entities, appendOperation);
147  }
148 
150  template<typename TContainer>
152  const std::string& collectionName,
153  const TContainer& entities,
154  const CreateFilter<typename TContainer::value_type>& createFilter) {
155  auto appendOperation = [createFilter](auto& bulk, const auto& entity, auto) {
156  auto filter = createFilter(entity);
157  bulk.append(mongocxx::model::delete_many(filter.view()));
158  };
159 
160  return bulkWrite<TContainer>(collectionName, entities, appendOperation);
161  }
162 
163  private:
164  thread::future<BulkWriteResult> handleBulkOperation(std::shared_ptr<BulkWriteParams>&& pBulkWriteParams) {
165  // note: pBulkWriteParams depends on pThis (pBulkWriteParams.pConnection depends on pThis.m_connectionPool)
166  // it's crucial to move pBulkWriteParams into lambda, otherwise it would be copied while pThis would be moved
167  auto pPromise = std::make_shared<thread::promise<BulkWriteResult>>();
168  boost::asio::post(m_ioContext, [pThis = shared_from_this(), pBulkWriteParams{std::move(pBulkWriteParams)}, pPromise]() {
169  pThis->bulkWrite(*pBulkWriteParams, *pPromise);
170  });
171 
172  return pPromise->get_future();
173  }
174 
175  void bulkWrite(BulkWriteParams& bulkWriteParams, thread::promise<BulkWriteResult>& promise) {
176  try {
177  // if something goes wrong mongo will throw, else a result is always available
178  auto result = bulkWriteParams.Bulk.execute().get();
179  promise.set_value(BulkWriteResult(result));
180  } catch (const mongocxx::bulk_write_exception& e) {
181  std::ostringstream stream;
182  stream << "message: " << e.code().message();
183  if (e.raw_server_error()) {
184  auto description = bsoncxx::to_json(e.raw_server_error().get());
185  stream << ", description: " << description;
186  }
187 
188  CATAPULT_LOG(fatal) << "throwing exception: " << stream.str().c_str();
189  promise.set_exception(std::make_exception_ptr(catapult_runtime_error(stream.str().c_str())));
190  }
191  }
192 
194  public:
195  explicit BulkWriteContext(size_t numOperations) : m_futures(numOperations)
196  {}
197 
198  public:
200  return thread::when_all(std::move(m_futures));
201  }
202 
203  void setFutureAt(size_t index, thread::future<BulkWriteResult>&& future) {
204  m_futures[index] = std::move(future);
205  }
206 
207  private:
208  std::vector<thread::future<BulkWriteResult>> m_futures;
209  };
210 
211  template<typename TEntity, typename TContainer>
213  const std::string& collectionName,
214  const TContainer& entities,
215  const AppendOperation<typename TContainer::value_type>& appendOperation) {
216  if (entities.empty())
218 
219  auto numThreads = m_pPool->numWorkerThreads();
220  auto pContext = std::make_shared<BulkWriteContext>(std::min<size_t>(entities.size(), numThreads));
221  auto workCallback = [pThis = shared_from_this(), entitiesStart = entities.cbegin(), collectionName, appendOperation, pContext](
222  auto itBegin,
223  auto itEnd,
224  auto startIndex,
225  auto batchIndex) {
226  auto pBulkWriteParams = std::make_shared<BulkWriteParams>(*pThis, collectionName);
227 
228  auto index = static_cast<uint32_t>(startIndex);
229  for (auto iter = itBegin; itEnd != iter; ++iter, ++index)
230  appendOperation(pBulkWriteParams->Bulk, *iter, index);
231 
232  pContext->setFutureAt(batchIndex, pThis->handleBulkOperation(std::move(pBulkWriteParams)));
233  };
234  return thread::compose(thread::ParallelForPartition(m_ioContext, entities, numThreads, workCallback), [pContext](const auto&) {
235  return pContext->aggregateFuture();
236  });
237  }
238 
239  private:
240  std::string m_dbName;
241  std::shared_ptr<const thread::IoThreadPool> m_pPool;
242  boost::asio::io_context& m_ioContext;
243  mongocxx::pool m_connectionPool;
244  };
245 }}
catapult::mongo::MongoBulkWriter::bulkDelete
BulkWriteResultFuture bulkDelete(const std::string &collectionName, const TContainer &entities, const CreateFilter< typename TContainer::value_type > &createFilter)
Deletes entities from the collection named collectionName matching the specified entity filter (creat...
Definition: MongoBulkWriter.h:151
FutureUtils.h
catapult::mongo::MongoBulkWriter::BulkWriteContext
Definition: MongoBulkWriter.h:193
BulkWriteResult.h
CATAPULT_LOG
#define CATAPULT_LOG(SEV)
Writes a log entry to the default logger with SEV severity.
Definition: Logging.h:340
catapult::mongo::MongoBulkWriter::BulkWriteParams::BulkWriteParams
BulkWriteParams(MongoBulkWriter &mongoBulkWriter, const std::string &collectionName)
Definition: MongoBulkWriter.h:47
catapult::mongo::MongoBulkWriter::bulkInsert
BulkWriteResultFuture bulkInsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument)
Definition: MongoBulkWriter.h:103
exceptions.h
MemoryUtils.h
catapult::mongo::MongoBulkWriter::Create
static std::shared_ptr< MongoBulkWriter > Create(const mongocxx::uri &uri, const std::string &dbName, const std::shared_ptr< thread::IoThreadPool > &pPool)
Definition: MongoBulkWriter.h:87
catapult::mongo::MongoBulkWriter::BulkWriteParams::Collection
mongocxx::collection Collection
Definition: MongoBulkWriter.h:57
catapult::thread::compose
auto compose(future< TSeed > &&startFuture, TCreateNextFuture createNextFuture)
Definition: FutureUtils.h:102
catapult::mongo::MongoBulkWriter::BulkWriteContext::setFutureAt
void setFutureAt(size_t index, thread::future< BulkWriteResult > &&future)
Definition: MongoBulkWriter.h:203
Elements.h
catapult::mongo::BulkWriteResult
Result of a bulk write operation to the database.
Definition: BulkWriteResult.h:27
ParallelFor.h
catapult::mongo::MongoBulkWriter::bulkWrite
BulkWriteResultFuture bulkWrite(const std::string &collectionName, const TContainer &entities, const AppendOperation< typename TContainer::value_type > &appendOperation)
Definition: MongoBulkWriter.h:212
catapult::thread::when_all
future< std::vector< future< T > > > when_all(std::vector< future< T >> &&allFutures)
Returns a future that is signaled when all futures in allFutures complete.
Definition: FutureUtils.h:31
catapult::mongo::MongoBulkWriter::AccountStates
std::unordered_set< std::shared_ptr< const state::AccountState > > AccountStates
Definition: MongoBulkWriter.h:61
catapult::mongo::MongoBulkWriter::bulkInsert
BulkWriteResultFuture bulkInsert(const std::string &collectionName, const TContainer &entities, const CreateDocuments< typename TContainer::value_type > &createDocuments)
Definition: MongoBulkWriter.h:118
catapult::mongo::MongoBulkWriter::BulkWriteContext::aggregateFuture
BulkWriteResultFuture aggregateFuture()
Definition: MongoBulkWriter.h:199
catapult::thread::promise::set_exception
void set_exception(std::exception_ptr pException)
Sets the result of this promise to pException.
Definition: Future.h:115
catapult::mongo::MongoBulkWriter::BulkWriteParams::pConnection
mongocxx::pool::entry pConnection
Definition: MongoBulkWriter.h:55
catapult::mongo::MongoBulkWriter::BulkWriteParams::Database
mongocxx::database Database
Definition: MongoBulkWriter.h:56
catapult::mongo::MongoBulkWriter::bulkUpsert
BulkWriteResultFuture bulkUpsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument, const CreateFilter< typename TContainer::value_type > &createFilter)
Definition: MongoBulkWriter.h:133
catapult::thread::future
Provides a way to access the result of an asynchronous operation.
Definition: Future.h:29
IoThreadPool.h
catapult::mongo::MongoBulkWriter::CreateDocuments
std::function< std::vector< bsoncxx::document::value >(const TEntity &, uint32_t)> CreateDocuments
Definition: MongoBulkWriter.h:71
catapult::mongo::MongoBulkWriter::m_dbName
std::string m_dbName
Definition: MongoBulkWriter.h:240
catapult::catapult_runtime_error
catapult_error< std::runtime_error > catapult_runtime_error
Definition: exceptions.h:87
catapult::mongo::MongoBulkWriter::m_pPool
std::shared_ptr< const thread::IoThreadPool > m_pPool
Definition: MongoBulkWriter.h:241
catapult::mongo::MongoBulkWriter::CreateFilter
std::function< bsoncxx::document::value(const TEntity &)> CreateFilter
Definition: MongoBulkWriter.h:74
catapult::thread::make_ready_future
future< T > make_ready_future(T &&value)
Produces a future that is ready immediately and holds the given value.
Definition: Future.h:126
catapult::mongo::MongoBulkWriter
Definition: MongoBulkWriter.h:43
catapult::mongo::MongoBulkWriter::m_ioContext
boost::asio::io_context & m_ioContext
Definition: MongoBulkWriter.h:242
catapult::mongo::MongoBulkWriter::BulkWriteContext::m_futures
std::vector< thread::future< BulkWriteResult > > m_futures
Definition: MongoBulkWriter.h:208
catapult::mongo::MongoBulkWriter::m_connectionPool
mongocxx::pool m_connectionPool
Definition: MongoBulkWriter.h:243
catapult::mongo::MongoBulkWriter::BulkWriteParams::Bulk
mongocxx::bulk_write Bulk
Definition: MongoBulkWriter.h:58
AccountState.h
catapult::thread::promise::set_value
void set_value(T &&value)
Sets the result of this promise to value.
Definition: Future.h:110
catapult::mongo::MongoBulkWriter::AppendOperation
consumer< mongocxx::bulk_write &, const TEntity &, uint32_t > AppendOperation
Definition: MongoBulkWriter.h:65
types.h
catapult::thread::ParallelForPartition
thread::future< bool > ParallelForPartition(boost::asio::io_context &ioContext, TItems &items, size_t numPartitions, TWorkCallback callback)
Definition: ParallelFor.h:30
catapult::mongo::MongoBulkWriter::MongoBulkWriter
MongoBulkWriter(const mongocxx::uri &uri, const std::string &dbName, const std::shared_ptr< thread::IoThreadPool > &pPool)
Definition: MongoBulkWriter.h:77
catapult
Definition: AddressExtractionExtension.cpp:28
catapult::mongo::MongoBulkWriter::BulkWriteContext::BulkWriteContext
BulkWriteContext(size_t numOperations)
Definition: MongoBulkWriter.h:195
catapult::mongo::MongoBulkWriter::BulkWriteParams
Definition: MongoBulkWriter.h:45
catapult::mongo::MongoBulkWriter::CreateDocument
std::function< bsoncxx::document::value(const TEntity &, uint32_t)> CreateDocument
Definition: MongoBulkWriter.h:68
catapult::mongo::MongoBulkWriter::handleBulkOperation
thread::future< BulkWriteResult > handleBulkOperation(std::shared_ptr< BulkWriteParams > &&pBulkWriteParams)
Definition: MongoBulkWriter.h:164
catapult::mongo::MongoBulkWriter::bulkWrite
void bulkWrite(BulkWriteParams &bulkWriteParams, thread::promise< BulkWriteResult > &promise)
Definition: MongoBulkWriter.h:175
catapult::consumer
std::function< void(TArgs...)> consumer
A consumer function.
Definition: functions.h:35
catapult::thread::promise
Stores the result of an asynchronous operation.
Definition: Future.h:85