CatapultServer
v0.5.0.1 (Elephant)
|
Go to the documentation of this file.
31 #include <boost/asio/io_context.hpp>
32 #include <bsoncxx/json.hpp>
33 #include <mongocxx/client.hpp>
34 #include <mongocxx/config/version.hpp>
35 #include <mongocxx/exception/bulk_write_exception.hpp>
36 #include <mongocxx/pool.hpp>
37 #include <unordered_set>
39 namespace catapult {
namespace mongo {
43 class MongoBulkWriter final :
public std::enable_shared_from_this<MongoBulkWriter> {
61 using AccountStates = std::unordered_set<std::shared_ptr<const state::AccountState>>;
64 template<
typename TEntity>
67 template<
typename TEntity>
68 using CreateDocument = std::function<bsoncxx::document::value (
const TEntity&, uint32_t)>;
70 template<
typename TEntity>
71 using CreateDocuments = std::function<std::vector<bsoncxx::document::value> (
const TEntity&, uint32_t)>;
73 template<
typename TEntity>
74 using CreateFilter = std::function<bsoncxx::document::value (
const TEntity&)>;
77 MongoBulkWriter(
const mongocxx::uri& uri,
const std::string& dbName,
const std::shared_ptr<thread::IoThreadPool>& pPool)
87 static std::shared_ptr<MongoBulkWriter>
Create(
88 const mongocxx::uri& uri,
89 const std::string& dbName,
90 const std::shared_ptr<thread::IoThreadPool>& pPool) {
92 auto pData = utils::MakeUniqueWithSize<uint8_t>(
sizeof(
MongoBulkWriter));
93 auto pWriterRaw =
new (pData.get())
MongoBulkWriter(uri, dbName, pPool);
94 auto pWriter = std::shared_ptr<MongoBulkWriter>(pWriterRaw);
102 template<
typename TContainer>
104 const std::string& collectionName,
105 const TContainer& entities,
107 auto appendOperation = [createDocument](
auto& bulk,
const auto& entity,
auto index) {
108 auto entityDocument = createDocument(entity, index);
109 bulk.append(mongocxx::model::insert_one(entityDocument.view()));
112 return bulkWrite<TContainer>(collectionName, entities, appendOperation);
117 template<
typename TContainer>
119 const std::string& collectionName,
120 const TContainer& entities,
122 auto appendOperation = [createDocuments](
auto& bulk,
const auto& entity,
auto index) {
123 for (
const auto& entityDocument : createDocuments(entity, index))
124 bulk.append(mongocxx::model::insert_one(entityDocument.view()));
127 return bulkWrite<TContainer>(collectionName, entities, appendOperation);
132 template<
typename TContainer>
134 const std::string& collectionName,
135 const TContainer& entities,
138 auto appendOperation = [createDocument, createFilter](
auto& bulk,
const auto& entity,
auto index) {
139 auto entityDocument = createDocument(entity, index);
140 auto filter = createFilter(entity);
141 mongocxx::model::replace_one replace_op(filter.view(), entityDocument.view());
142 replace_op.upsert(
true);
143 bulk.append(replace_op);
146 return bulkWrite<TContainer>(collectionName, entities, appendOperation);
150 template<
typename TContainer>
152 const std::string& collectionName,
153 const TContainer& entities,
155 auto appendOperation = [createFilter](
auto& bulk,
const auto& entity,
auto) {
156 auto filter = createFilter(entity);
157 bulk.append(mongocxx::model::delete_many(filter.view()));
160 return bulkWrite<TContainer>(collectionName, entities, appendOperation);
167 auto pPromise = std::make_shared<thread::promise<BulkWriteResult>>();
168 boost::asio::post(
m_ioContext, [pThis = shared_from_this(), pBulkWriteParams{std::move(pBulkWriteParams)}, pPromise]() {
169 pThis->bulkWrite(*pBulkWriteParams, *pPromise);
172 return pPromise->get_future();
178 auto result = bulkWriteParams.
Bulk.execute().get();
180 }
catch (
const mongocxx::bulk_write_exception& e) {
181 std::ostringstream stream;
182 stream <<
"message: " << e.code().message();
183 if (e.raw_server_error()) {
184 auto description = bsoncxx::to_json(e.raw_server_error().get());
185 stream <<
", description: " << description;
188 CATAPULT_LOG(fatal) <<
"throwing exception: " << stream.str().c_str();
211 template<
typename TEntity,
typename TContainer>
213 const std::string& collectionName,
214 const TContainer& entities,
216 if (entities.empty())
219 auto numThreads =
m_pPool->numWorkerThreads();
220 auto pContext = std::make_shared<BulkWriteContext>(std::min<size_t>(entities.size(), numThreads));
221 auto workCallback = [pThis = shared_from_this(), entitiesStart = entities.cbegin(), collectionName, appendOperation, pContext](
226 auto pBulkWriteParams = std::make_shared<BulkWriteParams>(*pThis, collectionName);
228 auto index = static_cast<uint32_t>(startIndex);
229 for (
auto iter = itBegin; itEnd != iter; ++iter, ++index)
230 appendOperation(pBulkWriteParams->Bulk, *iter, index);
232 pContext->setFutureAt(batchIndex, pThis->handleBulkOperation(std::move(pBulkWriteParams)));
235 return pContext->aggregateFuture();
241 std::shared_ptr<const thread::IoThreadPool>
m_pPool;
BulkWriteResultFuture bulkDelete(const std::string &collectionName, const TContainer &entities, const CreateFilter< typename TContainer::value_type > &createFilter)
Deletes entities from the collection named collectionName matching the specified entity filter (creat...
Definition: MongoBulkWriter.h:151
Definition: MongoBulkWriter.h:193
#define CATAPULT_LOG(SEV)
Writes a log entry to the default logger with SEV severity.
Definition: Logging.h:340
BulkWriteParams(MongoBulkWriter &mongoBulkWriter, const std::string &collectionName)
Definition: MongoBulkWriter.h:47
BulkWriteResultFuture bulkInsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument)
Definition: MongoBulkWriter.h:103
static std::shared_ptr< MongoBulkWriter > Create(const mongocxx::uri &uri, const std::string &dbName, const std::shared_ptr< thread::IoThreadPool > &pPool)
Definition: MongoBulkWriter.h:87
mongocxx::collection Collection
Definition: MongoBulkWriter.h:57
auto compose(future< TSeed > &&startFuture, TCreateNextFuture createNextFuture)
Definition: FutureUtils.h:102
void setFutureAt(size_t index, thread::future< BulkWriteResult > &&future)
Definition: MongoBulkWriter.h:203
Result of a bulk write operation to the database.
Definition: BulkWriteResult.h:27
BulkWriteResultFuture bulkWrite(const std::string &collectionName, const TContainer &entities, const AppendOperation< typename TContainer::value_type > &appendOperation)
Definition: MongoBulkWriter.h:212
future< std::vector< future< T > > > when_all(std::vector< future< T >> &&allFutures)
Returns a future that is signaled when all futures in allFutures complete.
Definition: FutureUtils.h:31
std::unordered_set< std::shared_ptr< const state::AccountState > > AccountStates
Definition: MongoBulkWriter.h:61
BulkWriteResultFuture bulkInsert(const std::string &collectionName, const TContainer &entities, const CreateDocuments< typename TContainer::value_type > &createDocuments)
Definition: MongoBulkWriter.h:118
BulkWriteResultFuture aggregateFuture()
Definition: MongoBulkWriter.h:199
void set_exception(std::exception_ptr pException)
Sets the result of this promise to pException.
Definition: Future.h:115
mongocxx::pool::entry pConnection
Definition: MongoBulkWriter.h:55
mongocxx::database Database
Definition: MongoBulkWriter.h:56
BulkWriteResultFuture bulkUpsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument, const CreateFilter< typename TContainer::value_type > &createFilter)
Definition: MongoBulkWriter.h:133
Provides a way to access the result of an asynchronous operation.
Definition: Future.h:29
std::function< std::vector< bsoncxx::document::value >(const TEntity &, uint32_t)> CreateDocuments
Definition: MongoBulkWriter.h:71
std::string m_dbName
Definition: MongoBulkWriter.h:240
catapult_error< std::runtime_error > catapult_runtime_error
Definition: exceptions.h:87
std::shared_ptr< const thread::IoThreadPool > m_pPool
Definition: MongoBulkWriter.h:241
std::function< bsoncxx::document::value(const TEntity &)> CreateFilter
Definition: MongoBulkWriter.h:74
future< T > make_ready_future(T &&value)
Produces a future that is ready immediately and holds the given value.
Definition: Future.h:126
Definition: MongoBulkWriter.h:43
boost::asio::io_context & m_ioContext
Definition: MongoBulkWriter.h:242
std::vector< thread::future< BulkWriteResult > > m_futures
Definition: MongoBulkWriter.h:208
mongocxx::pool m_connectionPool
Definition: MongoBulkWriter.h:243
mongocxx::bulk_write Bulk
Definition: MongoBulkWriter.h:58
void set_value(T &&value)
Sets the result of this promise to value.
Definition: Future.h:110
consumer< mongocxx::bulk_write &, const TEntity &, uint32_t > AppendOperation
Definition: MongoBulkWriter.h:65
thread::future< bool > ParallelForPartition(boost::asio::io_context &ioContext, TItems &items, size_t numPartitions, TWorkCallback callback)
Definition: ParallelFor.h:30
MongoBulkWriter(const mongocxx::uri &uri, const std::string &dbName, const std::shared_ptr< thread::IoThreadPool > &pPool)
Definition: MongoBulkWriter.h:77
Definition: AddressExtractionExtension.cpp:28
BulkWriteContext(size_t numOperations)
Definition: MongoBulkWriter.h:195
Definition: MongoBulkWriter.h:45
std::function< bsoncxx::document::value(const TEntity &, uint32_t)> CreateDocument
Definition: MongoBulkWriter.h:68
thread::future< BulkWriteResult > handleBulkOperation(std::shared_ptr< BulkWriteParams > &&pBulkWriteParams)
Definition: MongoBulkWriter.h:164
void bulkWrite(BulkWriteParams &bulkWriteParams, thread::promise< BulkWriteResult > &promise)
Definition: MongoBulkWriter.h:175
std::function< void(TArgs...)> consumer
A consumer function.
Definition: functions.h:35
Stores the result of an asynchronous operation.
Definition: Future.h:85