CatapultServer  v0.5.0.1 (Elephant)
MongoCacheStorage.h
Go to the documentation of this file.
1 
21 #pragma once
26 #include <set>
27 #include <unordered_set>
28 
29 namespace catapult { namespace mongo { namespace storages {
30 
31  namespace detail {
33  template<typename TCacheTraits, typename TElementContainerType>
35  public:
37  static void RemoveCommonElements(TElementContainerType& addedElements, TElementContainerType& removedElements) {
38  auto commonIds = GetCommonIds(addedElements, removedElements);
39  RemoveElements(addedElements, commonIds);
40  RemoveElements(removedElements, commonIds);
41  }
42 
43  private:
44  static auto GetCommonIds(const TElementContainerType& addedElements, const TElementContainerType& removedElements) {
45  std::set<typename TCacheTraits::KeyType> addedElementIds;
46  for (const auto* pAddedElement : addedElements)
47  addedElementIds.insert(TCacheTraits::GetId(*pAddedElement));
48 
49  std::set<typename TCacheTraits::KeyType> removedElementIds;
50  for (const auto* pRemovedElement : removedElements)
51  removedElementIds.insert(TCacheTraits::GetId(*pRemovedElement));
52 
53  std::set<typename TCacheTraits::KeyType> commonIds;
54  std::set_intersection(
55  addedElementIds.cbegin(),
56  addedElementIds.cend(),
57  removedElementIds.cbegin(),
58  removedElementIds.cend(),
59  std::inserter(commonIds, commonIds.cbegin()));
60  return commonIds;
61  }
62 
63  static void RemoveElements(TElementContainerType& elements, std::set<typename TCacheTraits::KeyType>& ids) {
64  for (auto iter = elements.cbegin(); elements.cend() != iter;) {
65  if (ids.cend() != ids.find(TCacheTraits::GetId(**iter)))
66  iter = elements.erase(iter);
67  else
68  ++iter;
69  }
70  }
71  };
72  }
73 
75  template<typename TDescriptor>
78  using CacheType = typename TDescriptor::CacheType;
79 
81  using CacheDeltaType = typename TDescriptor::CacheDeltaType;
82 
84  using KeyType = typename TDescriptor::KeyType;
85 
87  using ModelType = typename TDescriptor::ValueType;
88 
90  static constexpr auto GetId = TDescriptor::GetKeyFromValue;
91  };
92 
94  template<typename TCacheTraits>
95  class MongoHistoricalCacheStorage : public ExternalCacheStorageT<typename TCacheTraits::CacheType> {
96  private:
98  typename TCacheTraits::CacheDeltaType,
99  typename TCacheTraits::CacheType::CacheValueType>;
100  using ElementContainerType = typename TCacheTraits::ElementContainerType;
101  using IdContainerType = typename TCacheTraits::IdContainerType;
102 
103  public:
106  : m_database(storageContext.createDatabaseConnection())
107  , m_errorPolicy(storageContext.createCollectionErrorPolicy(TCacheTraits::Collection_Name))
108  , m_bulkWriter(storageContext.bulkWriter())
109  , m_networkIdentifier(networkIdentifier)
110  {}
111 
112  private:
113  void saveDelta(const CacheChangesType& changes) override {
114  auto addedElements = changes.addedElements();
115  auto modifiedElements = changes.modifiedElements();
116  auto removedElements = changes.removedElements();
117 
118  // 1. remove elements common to both added and removed
120 
121  // 2. remove all modified and removed elements from db
122  auto modifiedIds = GetIds(modifiedElements);
123  auto removedIds = GetIds(removedElements);
124  modifiedIds.insert(removedIds.cbegin(), removedIds.cend());
125  removeAll(modifiedIds);
126 
127  // 3. insert new elements and modified elements into db
128  modifiedElements.insert(addedElements.cbegin(), addedElements.cend());
129  insertAll(modifiedElements);
130  }
131 
132  private:
133  void removeAll(const IdContainerType& ids) {
134  if (ids.empty())
135  return;
136 
137  auto collection = m_database[TCacheTraits::Collection_Name];
138 
139  auto filter = CreateDeleteFilter(ids);
140  auto deleteResult = collection.delete_many(filter.view());
141  m_errorPolicy.checkDeletedAtLeast(ids.size(), BulkWriteResult(deleteResult.get().result()), "removed and modified elements");
142  }
143 
144  void insertAll(const ElementContainerType& elements) {
145  if (elements.empty())
146  return;
147 
148  std::vector<typename TCacheTraits::ModelType> allModels;
149  for (const auto* pElement : elements) {
150  auto models = TCacheTraits::MapToMongoModels(*pElement, m_networkIdentifier);
151  std::move(models.begin(), models.end(), std::back_inserter(allModels));
152  }
153 
154  auto insertResults = m_bulkWriter.bulkInsert(TCacheTraits::Collection_Name, allModels, [](const auto& model, auto) {
155  return TCacheTraits::MapToMongoDocument(model);
156  }).get();
157 
158  auto aggregateResult = BulkWriteResult::Aggregate(thread::get_all(std::move(insertResults)));
159  m_errorPolicy.checkInserted(allModels.size(), aggregateResult, "modified and added elements");
160  }
161 
162  private:
163  static IdContainerType GetIds(const ElementContainerType& elements) {
164  IdContainerType ids;
165  for (const auto* pElement : elements)
166  ids.insert(TCacheTraits::GetId(*pElement));
167 
168  return ids;
169  }
170 
171  static bsoncxx::document::value CreateDeleteFilter(const IdContainerType& ids) {
172  using namespace bsoncxx::builder::stream;
173 
174  if (ids.empty())
175  return document() << finalize;
176 
177  document doc;
178  auto array = doc
179  << std::string(TCacheTraits::Id_Property_Name)
180  << open_document
181  << "$in"
182  << open_array;
183 
184  for (auto id : ids)
185  array << TCacheTraits::MapToMongoId(id);
186 
187  array << close_array;
188  doc << close_document;
189  return doc << finalize;
190  }
191 
192  private:
197  };
198 
200  template<typename TCacheTraits>
201  class MongoFlatCacheStorage : public ExternalCacheStorageT<typename TCacheTraits::CacheType> {
202  private:
204  using KeyType = typename TCacheTraits::KeyType;
205  using ModelType = typename TCacheTraits::ModelType;
206  using ElementContainerType = std::unordered_set<const ModelType*>;
207 
208  public:
211  : m_database(storageContext.createDatabaseConnection())
212  , m_errorPolicy(storageContext.createCollectionErrorPolicy(TCacheTraits::Collection_Name))
213  , m_bulkWriter(storageContext.bulkWriter())
214  , m_networkIdentifier(networkIdentifier)
215  {}
216 
217  private:
218  void saveDelta(const CacheChangesType& changes) override {
219  auto addedElements = changes.addedElements();
220  auto modifiedElements = changes.modifiedElements();
221  auto removedElements = changes.removedElements();
222 
223  // 1. remove elements common to both added and removed
225 
226  // 2. remove all removed elements from db
227  removeAll(removedElements);
228 
229  // 3. upsert new elements and modified elements into db
230  modifiedElements.insert(addedElements.cbegin(), addedElements.cend());
231  upsertAll(modifiedElements);
232  }
233 
234  private:
235  void removeAll(const ElementContainerType& elements) {
236  if (elements.empty())
237  return;
238 
239  auto deleteResults = m_bulkWriter.bulkDelete(TCacheTraits::Collection_Name, elements, CreateFilter).get();
240  auto aggregateResult = BulkWriteResult::Aggregate(thread::get_all(std::move(deleteResults)));
241  m_errorPolicy.checkDeleted(elements.size(), aggregateResult, "removed elements");
242  }
243 
244  void upsertAll(const ElementContainerType& elements) {
245  if (elements.empty())
246  return;
247 
248  auto createDocument = [networkIdentifier = m_networkIdentifier](const auto* pModel, auto) {
249  return TCacheTraits::MapToMongoDocument(*pModel, networkIdentifier);
250  };
251  auto upsertResults = m_bulkWriter.bulkUpsert(TCacheTraits::Collection_Name, elements, createDocument, CreateFilter).get();
252  auto aggregateResult = BulkWriteResult::Aggregate(thread::get_all(std::move(upsertResults)));
253  m_errorPolicy.checkUpserted(elements.size(), aggregateResult, "modified and added elements");
254  }
255 
256  private:
257  static bsoncxx::document::value CreateFilter(const ModelType* pModel) {
258  return CreateFilterByKey(TCacheTraits::GetId(*pModel));
259  }
260 
261  static bsoncxx::document::value CreateFilterByKey(const KeyType& key) {
262  using namespace bsoncxx::builder::stream;
263 
264  return document() << std::string(TCacheTraits::Id_Property_Name) << TCacheTraits::MapToMongoId(key) << finalize;
265  }
266 
267  private:
272  };
273 }}}
catapult::mongo::MongoBulkWriter::bulkDelete
BulkWriteResultFuture bulkDelete(const std::string &collectionName, const TContainer &entities, const CreateFilter< typename TContainer::value_type > &createFilter)
Deletes entities from the collection named collectionName matching the specified entity filter (creat...
Definition: MongoBulkWriter.h:151
catapult::mongo::storages::detail::MongoElementFilter
Defines a mongo element filter.
Definition: MongoCacheStorage.h:34
catapult::mongo::ExternalCacheStorageT
Typed interface for saving cache data to external storage.
Definition: ExternalCacheStorage.h:61
FutureUtils.h
catapult::mongo::storages::MongoHistoricalCacheStorage::m_errorPolicy
MongoErrorPolicy m_errorPolicy
Definition: MongoCacheStorage.h:194
catapult::mongo::storages::BasicMongoCacheStorageTraits::KeyType
typename TDescriptor::KeyType KeyType
Key type.
Definition: MongoCacheStorage.h:84
catapult::mongo::storages::MongoHistoricalCacheStorage::GetIds
static IdContainerType GetIds(const ElementContainerType &elements)
Definition: MongoCacheStorage.h:163
catapult::mongo::storages::detail::MongoElementFilter::RemoveElements
static void RemoveElements(TElementContainerType &elements, std::set< typename TCacheTraits::KeyType > &ids)
Definition: MongoCacheStorage.h:63
MapperUtils.h
catapult::mongo::storages::BasicMongoCacheStorageTraits::CacheDeltaType
typename TDescriptor::CacheDeltaType CacheDeltaType
Cache delta type.
Definition: MongoCacheStorage.h:81
catapult::mongo::storages::MongoFlatCacheStorage::m_errorPolicy
MongoErrorPolicy m_errorPolicy
Definition: MongoCacheStorage.h:269
catapult::mongo::MongoBulkWriter::bulkInsert
BulkWriteResultFuture bulkInsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument)
Definition: MongoBulkWriter.h:103
catapult::mongo::storages::MongoFlatCacheStorage::saveDelta
void saveDelta(const CacheChangesType &changes) override
Saves cache changes to external storage.
Definition: MongoCacheStorage.h:218
catapult::thread::future::get
T get()
Returns the result of this future and blocks until the result is available.
Definition: Future.h:50
catapult::mongo::storages::MongoFlatCacheStorage::KeyType
typename TCacheTraits::KeyType KeyType
Definition: MongoCacheStorage.h:204
catapult::mongo::storages::MongoHistoricalCacheStorage
A mongo cache storage that persists historical cache data using delete and insert.
Definition: MongoCacheStorage.h:95
Id_Property_Name
static constexpr auto Id_Property_Name
Definition: MongoHashLockInfoCacheStorage.cpp:31
MongoStorageContext.h
catapult::mongo::storages::MongoHistoricalCacheStorage::saveDelta
void saveDelta(const CacheChangesType &changes) override
Saves cache changes to external storage.
Definition: MongoCacheStorage.h:113
Collection_Name
static constexpr auto Collection_Name
Definition: MongoHashLockInfoCacheStorage.cpp:30
catapult::mongo::storages::MongoHistoricalCacheStorage::insertAll
void insertAll(const ElementContainerType &elements)
Definition: MongoCacheStorage.h:144
catapult::mongo::storages::MongoHistoricalCacheStorage::IdContainerType
typename TCacheTraits::IdContainerType IdContainerType
Definition: MongoCacheStorage.h:101
catapult::mongo::storages::MongoFlatCacheStorage::m_bulkWriter
MongoBulkWriter & m_bulkWriter
Definition: MongoCacheStorage.h:270
catapult::mongo::storages::MongoFlatCacheStorage::ElementContainerType
std::unordered_set< const ModelType * > ElementContainerType
Definition: MongoCacheStorage.h:206
catapult::cache::SingleCacheChangesT
Provides common view of single sub cache changes.
Definition: CacheChanges.h:59
catapult::mongo::storages::MongoFlatCacheStorage::m_database
MongoDatabase m_database
Definition: MongoCacheStorage.h:268
catapult::mongo::storages::MongoHistoricalCacheStorage::MongoHistoricalCacheStorage
MongoHistoricalCacheStorage(MongoStorageContext &storageContext, model::NetworkIdentifier networkIdentifier)
Creates a cache storage around storageContext and networkIdentifier.
Definition: MongoCacheStorage.h:105
catapult::mongo::storages::MongoHistoricalCacheStorage::removeAll
void removeAll(const IdContainerType &ids)
Definition: MongoCacheStorage.h:133
catapult::mongo::storages::MongoHistoricalCacheStorage::ElementContainerType
typename TCacheTraits::ElementContainerType ElementContainerType
Definition: MongoCacheStorage.h:100
catapult::mongo::BulkWriteResult
Result of a bulk write operation to the database.
Definition: BulkWriteResult.h:27
catapult::mongo::BulkWriteResult::Aggregate
static BulkWriteResult Aggregate(const std::vector< BulkWriteResult > &results)
Aggregates all bulk write results in results into a single result.
Definition: BulkWriteResult.h:49
catapult::mongo::storages::BasicMongoCacheStorageTraits
Defines types for mongo cache storage given a cache descriptor.
Definition: MongoCacheStorage.h:76
catapult::mongo::MongoErrorPolicy
Error policy for checking mongo operation results.
Definition: MongoErrorPolicy.h:30
catapult::mongo::storages::MongoHistoricalCacheStorage::m_database
MongoDatabase m_database
Definition: MongoCacheStorage.h:193
catapult::mongo::storages::MongoFlatCacheStorage::MongoFlatCacheStorage
MongoFlatCacheStorage(MongoStorageContext &storageContext, model::NetworkIdentifier networkIdentifier)
Creates a cache storage around storageContext and networkIdentifier.
Definition: MongoCacheStorage.h:210
catapult::mongo::storages::MongoFlatCacheStorage::m_networkIdentifier
model::NetworkIdentifier m_networkIdentifier
Definition: MongoCacheStorage.h:271
catapult::mongo::storages::MongoHistoricalCacheStorage::CreateDeleteFilter
static bsoncxx::document::value CreateDeleteFilter(const IdContainerType &ids)
Definition: MongoCacheStorage.h:171
catapult::mongo::MongoBulkWriter::bulkUpsert
BulkWriteResultFuture bulkUpsert(const std::string &collectionName, const TContainer &entities, const CreateDocument< typename TContainer::value_type > &createDocument, const CreateFilter< typename TContainer::value_type > &createFilter)
Definition: MongoBulkWriter.h:133
catapult::mongo::storages::detail::MongoElementFilter::GetCommonIds
static auto GetCommonIds(const TElementContainerType &addedElements, const TElementContainerType &removedElements)
Definition: MongoCacheStorage.h:44
catapult::mongo::storages::MongoHistoricalCacheStorage::m_networkIdentifier
model::NetworkIdentifier m_networkIdentifier
Definition: MongoCacheStorage.h:196
catapult::mongo::MongoErrorPolicy::checkUpserted
void checkUpserted(uint64_t numExpected, const BulkWriteResult &result, const std::string &itemsDescription) const
Checks that result indicates exactly numExpected upsertions occurred given itemsDescription.
Definition: MongoErrorPolicy.cpp:73
catapult::mongo::storages::MongoFlatCacheStorage::CreateFilterByKey
static bsoncxx::document::value CreateFilterByKey(const KeyType &key)
Definition: MongoCacheStorage.h:261
catapult::mongo::storages::BasicMongoCacheStorageTraits::GetId
static constexpr auto GetId
Gets the key corresponding to a value.
Definition: MongoCacheStorage.h:90
MongoBulkWriter.h
catapult::mongo::storages::MongoFlatCacheStorage::ModelType
typename TCacheTraits::ModelType ModelType
Definition: MongoCacheStorage.h:205
catapult::mongo::storages::MongoFlatCacheStorage::upsertAll
void upsertAll(const ElementContainerType &elements)
Definition: MongoCacheStorage.h:244
catapult::mongo::MongoBulkWriter
Definition: MongoBulkWriter.h:43
catapult::model::NetworkIdentifier
NetworkIdentifier
Possible network identifiers.
Definition: NetworkInfo.h:45
catapult::mongo::MongoErrorPolicy::checkDeletedAtLeast
void checkDeletedAtLeast(uint64_t numExpected, const BulkWriteResult &result, const std::string &itemsDescription) const
Checks that result indicates at least numExpected deletions occurred given itemsDescription.
Definition: MongoErrorPolicy.cpp:51
catapult::mongo::storages::MongoFlatCacheStorage::removeAll
void removeAll(const ElementContainerType &elements)
Definition: MongoCacheStorage.h:235
catapult::mongo::storages::MongoHistoricalCacheStorage::m_bulkWriter
MongoBulkWriter & m_bulkWriter
Definition: MongoCacheStorage.h:195
catapult::mongo::storages::MongoFlatCacheStorage::CreateFilter
static bsoncxx::document::value CreateFilter(const ModelType *pModel)
Definition: MongoCacheStorage.h:257
catapult::cache::SingleCacheChangesT::addedElements
PointerContainer addedElements() const
Gets pointers to all added elements.
Definition: CacheChanges.h:78
catapult::mongo::storages::detail::MongoElementFilter::RemoveCommonElements
static void RemoveCommonElements(TElementContainerType &addedElements, TElementContainerType &removedElements)
Filters all elements common to added elements (addedElements) and removed elements (removedElements).
Definition: MongoCacheStorage.h:37
catapult::mongo::MongoErrorPolicy::checkDeleted
void checkDeleted(uint64_t numExpected, const BulkWriteResult &result, const std::string &itemsDescription) const
Checks that result indicates exactly numExpected deletions occurred given itemsDescription.
Definition: MongoErrorPolicy.cpp:43
catapult
Definition: AddressExtractionExtension.cpp:28
catapult::mongo::storages::BasicMongoCacheStorageTraits::CacheType
typename TDescriptor::CacheType CacheType
Cache type.
Definition: MongoCacheStorage.h:78
catapult::mongo::MongoStorageContext
Context for creating a mongo storage.
Definition: MongoStorageContext.h:30
catapult::cache::SingleCacheChangesT::modifiedElements
PointerContainer modifiedElements() const
Gets pointers to all modified elements.
Definition: CacheChanges.h:83
catapult::mongo::storages::BasicMongoCacheStorageTraits::ModelType
typename TDescriptor::ValueType ModelType
Model type.
Definition: MongoCacheStorage.h:87
catapult::mongo::storages::MongoFlatCacheStorage
A mongo cache storage that persists flat cache data using delete and upsert.
Definition: MongoCacheStorage.h:201
catapult::thread::get_all
std::vector< T > get_all(std::vector< future< T >> &&futures)
Definition: FutureUtils.h:141
catapult::mongo::MongoErrorPolicy::checkInserted
void checkInserted(uint64_t numExpected, const BulkWriteResult &result, const std::string &itemsDescription) const
Checks that result indicates exactly numExpected insertions occurred given itemsDescription.
Definition: MongoErrorPolicy.cpp:65
catapult::mongo::MongoDatabase
Represents a mongo database.
Definition: MongoDatabase.h:31
catapult::cache::SingleCacheChangesT::removedElements
PointerContainer removedElements() const
Gets pointers to all removed elements.
Definition: CacheChanges.h:88