OILS / vendor / souffle / datastructure / ConcurrentInsertOnlyHashMap.h View on Github | oils.pub

478 lines, 195 significant
1/*
2 * Souffle - A Datalog Compiler
3 * Copyright (c) 2021, The Souffle Developers. All rights reserved
4 * Licensed under the Universal Permissive License v 1.0 as shown at:
5 * - https://opensource.org/licenses/UPL
6 * - <souffle root>/licenses/SOUFFLE-UPL.txt
7 */
8#pragma once
9
10#include "souffle/utility/ParallelUtil.h"
11
12#include <array>
13#include <atomic>
14#include <cassert>
15#include <cmath>
16#include <memory>
17#include <mutex>
18#include <vector>
19
20namespace souffle {
21namespace details {
22
23static const std::vector<std::pair<unsigned, unsigned>> ToPrime = {
24 // https://primes.utm.edu/lists/2small/0bit.html
25 // ((2^n) - k) is prime
26 // {n, k}
27 {4, 3}, // 2^4 - 3 = 13
28 {8, 5}, // 8^5 - 5 = 251
29 {9, 3}, {10, 3}, {11, 9}, {12, 3}, {13, 1}, {14, 3}, {15, 19}, {16, 15}, {17, 1}, {18, 5}, {19, 1},
30 {20, 3}, {21, 9}, {22, 3}, {23, 15}, {24, 3}, {25, 39}, {26, 5}, {27, 39}, {28, 57}, {29, 3},
31 {30, 35}, {31, 1}, {32, 5}, {33, 9}, {34, 41}, {35, 31}, {36, 5}, {37, 25}, {38, 45}, {39, 7},
32 {40, 87}, {41, 21}, {42, 11}, {43, 57}, {44, 17}, {45, 55}, {46, 21}, {47, 115}, {48, 59}, {49, 81},
33 {50, 27}, {51, 129}, {52, 47}, {53, 111}, {54, 33}, {55, 55}, {56, 5}, {57, 13}, {58, 27}, {59, 55},
34 {60, 93}, {61, 1}, {62, 57}, {63, 25}};
35
36// (2^64)-59 is the largest prime that fits in uint64_t
37static constexpr uint64_t LargestPrime64 = 18446744073709551557UL;
38
39// Return a prime greater or equal to the lower bound.
40// Return 0 if the next prime would not fit in 64 bits.
41inline static uint64_t GreaterOrEqualPrime(const uint64_t LowerBound) {
42 if (LowerBound > LargestPrime64) {
43 return 0;
44 }
45
46 for (std::size_t I = 0; I < ToPrime.size(); ++I) {
47 const uint64_t N = ToPrime[I].first;
48 const uint64_t K = ToPrime[I].second;
49 const uint64_t Prime = (1ULL << N) - K;
50 if (Prime >= LowerBound) {
51 return Prime;
52 }
53 }
54 return LargestPrime64;
55}
56
57template <typename T>
58struct Factory {
59 template <class... Args>
60 T& replace(T& Place, Args&&... Xs) {
61 Place = T{std::forward<Args>(Xs)...};
62 return Place;
63 }
64};
65
66} // namespace details
67
68/**
69 * A concurrent, almost lock-free associative hash-map that can only grow.
70 * Elements cannot be removed, the hash-map can only grow.
71 *
72 * The datastructures enables a configurable number of concurrent access lanes.
73 * Access to the datastructure is lock-free between different lanes.
74 * Concurrent accesses through the same lane is sequential.
75 *
76 * Growing the datastructure requires to temporarily lock all lanes to let a
77 * single lane perform the growing operation. The global lock is amortized
78 * thanks to an exponential growth strategy.
79 */
80template <class LanesPolicy, class Key, class T, class Hash = std::hash<Key>,
81 class KeyEqual = std::equal_to<Key>, class KeyFactory = details::Factory<Key>>
82class ConcurrentInsertOnlyHashMap {
83public:
84 class Node;
85
86 using key_type = Key;
87 using mapped_type = T;
88 using node_type = Node*;
89 using value_type = std::pair<const Key, const T>;
90 using size_type = std::size_t;
91 using hasher = Hash;
92 using key_equal = KeyEqual;
93 using self_type = ConcurrentInsertOnlyHashMap<Key, T, Hash, KeyEqual, KeyFactory>;
94 using lane_id = typename LanesPolicy::lane_id;
95
96 class Node {
97 public:
98 virtual ~Node() {}
99 virtual const value_type& value() const = 0;
100 virtual const key_type& key() const = 0;
101 virtual const mapped_type& mapped() const = 0;
102 };
103
104private:
105 // Each bucket of the hash-map is a linked list.
106 struct BucketList : Node {
107 virtual ~BucketList() {}
108
109 BucketList(const Key& K, const T& V, BucketList* N) : Value(K, V), Next(N) {}
110
111 const value_type& value() const {
112 return Value;
113 }
114
115 const key_type& key() const {
116 return Value.first;
117 }
118
119 const mapped_type& mapped() const {
120 return Value.second;
121 }
122
123 // Stores the couple of a key and its associated value.
124 value_type Value;
125
126 // Points to next element of the map that falls into the same bucket.
127 BucketList* Next;
128 };
129
130public:
131 /**
132 * @brief Construct a hash-map with at least the given number of buckets.
133 *
134 * Load-factor is initialized to 1.0.
135 */
136 ConcurrentInsertOnlyHashMap(const std::size_t LaneCount, const std::size_t Bucket_Count,
137 const Hash& hash = Hash(), const KeyEqual& key_equal = KeyEqual(),
138 const KeyFactory& key_factory = KeyFactory())
139 : Lanes(LaneCount), Hasher(hash), EqualTo(key_equal), Factory(key_factory) {
140 Size = 0;
141 BucketCount = details::GreaterOrEqualPrime(Bucket_Count);
142 if (BucketCount == 0) {
143 // Hopefuly this number of buckets is never reached.
144 BucketCount = std::numeric_limits<std::size_t>::max();
145 }
146 LoadFactor = 1.0;
147 Buckets = std::make_unique<std::atomic<BucketList*>[]>(BucketCount);
148 MaxSizeBeforeGrow = static_cast<std::size_t>(std::ceil(LoadFactor * (double)BucketCount));
149 }
150
151 ConcurrentInsertOnlyHashMap(const Hash& hash = Hash(), const KeyEqual& key_equal = KeyEqual(),
152 const KeyFactory& key_factory = KeyFactory())
153 : ConcurrentInsertOnlyHashMap(8, hash, key_equal, key_factory) {}
154
155 ~ConcurrentInsertOnlyHashMap() {
156 for (std::size_t Bucket = 0; Bucket < BucketCount; ++Bucket) {
157 BucketList* L = Buckets[Bucket].load(std::memory_order_relaxed);
158 while (L != nullptr) {
159 BucketList* BL = L;
160 L = L->Next;
161 delete (BL);
162 }
163 }
164 }
165
166 void setNumLanes(const std::size_t NumLanes) {
167 Lanes.setNumLanes(NumLanes);
168 }
169
170 /** @brief Create a fresh node initialized with the given value and a
171 * default-constructed key.
172 *
173 * The ownership of the returned node given to the caller.
174 */
175 node_type node(const T& V) {
176 BucketList* BL = new BucketList(Key{}, V, nullptr);
177 return static_cast<node_type>(BL);
178 }
179
180 /**
181 * @brief Lookup a value associated with a key.
182 *
183 * The search is done concurrently with possible insertion of the
184 * searched key. If the a nullpointer is returned, then the key
185 * was not associated with a value when the search began.
186 */
187 template <class K>
188 const value_type* weakFind(const lane_id H, const K& X) const {
189 const size_t HashValue = Hasher(X);
190 const auto Guard = Lanes.guard(H);
191 const size_t Bucket = HashValue % BucketCount;
192
193 BucketList* L = Buckets[Bucket].load(std::memory_order_acquire);
194 while (L != nullptr) {
195 if (EqualTo(L->Value.first, X)) {
196 // found the key
197 return &L->Value;
198 }
199 L = L->Next;
200 }
201 return nullptr;
202 }
203
204 /** @brief Checks if the map contains an element with the given key.
205 *
206 * The search is done concurrently with possible insertion of the
207 * searched key. If return true, then there is definitely an element
208 * with the specified key, if return false then there was no such
209 * element when the search began.
210 */
211 template <class K>
212 inline bool weakContains(const lane_id H, const K& X) const {
213 return weakFind(H, X) != nullptr;
214 }
215
216 /**
217 * @brief Inserts in-place if the key is not mapped, does nothing if the key already exists.
218 *
219 * @param H is the access lane.
220 *
221 * @param N is a node initialized with the mapped value to insert.
222 *
223 * @param Xs are arguments to forward to the hasher, the comparator and and
224 * the constructor of the key.
225 *
226 *
227 * Be Careful: the inserted node becomes available to concurrent lanes as
228 * soon as it is inserted, thus concurrent lanes may access the inserted
229 * value even before the inserting lane returns from this function.
230 * This is the reason why the inserting lane must prepare the inserted
231 * node's mapped value prior to calling this function.
232 *
233 * Be Careful: the given node remains the ownership of the caller unless
234 * the returned couple second member is true.
235 *
236 * Be Careful: the given node may not be inserted if the key already
237 * exists. The caller is in charge of handling that case and either
238 * dispose of the node or save it for the next insertion operation.
239 *
240 * Be Careful: Once the given node is actually inserted, its ownership is
241 * transfered to the hash-map. However it remains valid.
242 *
243 * If the key that compares equal to arguments Xs exists, then nothing is
244 * inserted. The returned value is the couple of the pointer to the
245 * existing value and the false boolean value.
246 *
247 * If the key that compares equal to arguments Xs does not exist, then the
248 * node N is updated with the key constructed from Xs, and inserted in the
249 * hash-map. The returned value is the couple of the pointer to the
250 * inserted value and the true boolean value.
251 *
252 */
253 template <class... Args>
254 std::pair<const value_type*, bool> get(const lane_id H, const node_type N, Args&&... Xs) {
255 // At any time a concurrent lane may insert the key before this lane.
256 //
257 // The synchronisation point is the atomic compare-and-exchange of the
258 // head of the bucket list that must contain the inserted node.
259 //
260 // The insertion algorithm is as follow:
261 //
262 // 1) Compute the key hash from Xs.
263 //
264 // 2) Lock the lane, that also prevent concurrent lanes from growing of
265 // the datastructure.
266 //
267 // 3) Determine the bucket where the element must be inserted.
268 //
269 // 4) Read the "last known head" of the bucket list. Other lanes
270 // inserting in the same bucket may update the bucket head
271 // concurrently.
272 //
273 // 5) Search the bucket list for the key by comparing with Xs starting
274 // from the last known head. If it is not the first round of search,
275 // then stop searching where the previous round of search started.
276 //
277 // 6) If the key is found return the couple of the value pointer and
278 // false (to indicate that this lane did not insert the node N).
279 //
280 // 7) It the key is not found prepare N for insertion by updating its
281 // key with Xs and chaining the last known head.
282 //
283 // 8) Try to exchange to last known head with N at the bucket head. The
284 // atomic compare and exchange operation guarantees that it only
285 // succeed if not other node was inserted in the bucket since we
286 // searched it, otherwise it fails when another lane has concurrently
287 // inserted a node in the same bucket.
288 //
289 // 9) If the atomic compare and exchange succeeded, the node has just
290 // been inserted by this lane. From now-on other lanes can also see
291 // the node. Return the couple of a pointer to the inserted value and
292 // the true boolean.
293 //
294 // 10) If the atomic compare and exchange failed, another node has been
295 // inserted by a concurrent lane in the same bucket. A new round of
296 // search is required -> restart from step 4.
297 //
298 //
299 // The datastructure is optionaly grown after step 9) before returning.
300
301 const value_type* Value = nullptr;
302 bool Inserted = false;
303
304 size_t NewSize;
305
306 // 1)
307 const size_t HashValue = Hasher(std::forward<Args>(Xs)...);
308
309 // 2)
310 Lanes.lock(H); // prevent the datastructure from growing
311
312 // 3)
313 const size_t Bucket = HashValue % BucketCount;
314
315 // 4)
316 // the head of the bucket's list last time we checked
317 BucketList* LastKnownHead = Buckets[Bucket].load(std::memory_order_acquire);
318 // the head of the bucket's list we already searched from
319 BucketList* SearchedFrom = nullptr;
320 // the node we want to insert
321 BucketList* const Node = static_cast<BucketList*>(N);
322
323 // Loop until either the node is inserted or the key is found in the bucket.
324 // Assuming bucket collisions are rare this loop is not executed more than once.
325 while (true) {
326 // 5)
327 // search the key in the bucket, stop where we already search at a
328 // previous iteration.
329 BucketList* L = LastKnownHead;
330 while (L != SearchedFrom) {
331 if (EqualTo(L->Value.first, std::forward<Args>(Xs)...)) {
332 // 6)
333 // Found the key, no need to insert.
334 // Although it's not strictly necessary, clear the node
335 // chaining to avoid leaving a dangling pointer there.
336 Value = &(L->Value);
337 Node->Next = nullptr;
338 goto Done;
339 }
340 L = L->Next;
341 }
342 SearchedFrom = LastKnownHead;
343
344 // 7)
345 // Not found in bucket, prepare node chaining.
346 Node->Next = LastKnownHead;
347 // The factory step could be done only once, but assuming bucket collisions are
348 // rare this whole loop is not executed more than once.
349 Factory.replace(const_cast<key_type&>(Node->Value.first), std::forward<Args>(Xs)...);
350
351 // 8)
352 // Try to insert the key in front of the bucket's list.
353 // This operation also performs step 4) because LastKnownHead is
354 // updated in the process.
355 if (Buckets[Bucket].compare_exchange_strong(
356 LastKnownHead, Node, std::memory_order_release, std::memory_order_relaxed)) {
357 // 9)
358 Inserted = true;
359 NewSize = ++Size;
360 Value = &(Node->Value);
361 goto AfterInserted;
362 }
363
364 // 10) concurrent insertion detected in this bucket, new round required.
365 }
366
367 AfterInserted : {
368 if (NewSize > MaxSizeBeforeGrow) {
369 tryGrow(H);
370 }
371 }
372
373 Done:
374
375 Lanes.unlock(H);
376
377 // 6,9)
378 return std::make_pair(Value, Inserted);
379 }
380
381private:
382 // The concurrent lanes manager.
383 LanesPolicy Lanes;
384
385 /// Hash function.
386 Hash Hasher;
387
388 /// Current number of buckets.
389 std::size_t BucketCount;
390
391 /// Atomic pointer to head bucket linked-list head.
392 std::unique_ptr<std::atomic<BucketList*>[]> Buckets;
393
394 /// The Equal-to function.
395 KeyEqual EqualTo;
396
397 KeyFactory Factory;
398
399 /// Current number of elements stored in the map.
400 std::atomic<std::size_t> Size;
401
402 /// Maximum size before the map should grow.
403 std::size_t MaxSizeBeforeGrow;
404
405 /// The load-factor of the map.
406 double LoadFactor;
407
408 // Grow the datastructure.
409 // Must be called while owning lane H.
410 bool tryGrow(const lane_id H) {
411 Lanes.beforeLockAllBut(H);
412
413 if (Size <= MaxSizeBeforeGrow) {
414 // Current size is fine
415 Lanes.beforeUnlockAllBut(H);
416 return false;
417 }
418
419 Lanes.lockAllBut(H);
420
421 { // safe section
422
423 // Compute the new number of buckets:
424 // Chose a prime number of buckets that ensures the desired load factor
425 // given the current number of elements in the map.
426 const std::size_t CurrentSize = Size;
427 assert(LoadFactor > 0);
428 const std::size_t NeededBucketCount =
429 static_cast<std::size_t>(std::ceil(static_cast<double>(CurrentSize) / LoadFactor));
430 std::size_t NewBucketCount = NeededBucketCount;
431 for (std::size_t I = 0; I < details::ToPrime.size(); ++I) {
432 const uint64_t N = details::ToPrime[I].first;
433 const uint64_t K = details::ToPrime[I].second;
434 const uint64_t Prime = (1ULL << N) - K;
435 if (Prime >= NeededBucketCount) {
436 NewBucketCount = Prime;
437 break;
438 }
439 }
440
441 std::unique_ptr<std::atomic<BucketList*>[]> NewBuckets =
442 std::make_unique<std::atomic<BucketList*>[]>(NewBucketCount);
443
444 // Rehash, this operation is costly because it requires to scan
445 // the existing elements, compute its hash to find its new bucket
446 // and insert in the new bucket.
447 //
448 // Maybe concurrent lanes could help using some job-stealing algorithm.
449 //
450 // Use relaxed memory ordering since the whole operation takes place
451 // in a critical section.
452 for (std::size_t B = 0; B < BucketCount; ++B) {
453 BucketList* L = Buckets[B].load(std::memory_order_relaxed);
454 while (L) {
455 BucketList* const Elem = L;
456 L = L->Next;
457
458 const auto& Value = Elem->Value;
459 std::size_t NewHash = Hasher(Value.first);
460 const std::size_t NewBucket = NewHash % NewBucketCount;
461 Elem->Next = NewBuckets[NewBucket].load(std::memory_order_relaxed);
462 NewBuckets[NewBucket].store(Elem, std::memory_order_relaxed);
463 }
464 }
465
466 Buckets = std::move(NewBuckets);
467 BucketCount = NewBucketCount;
468 MaxSizeBeforeGrow =
469 static_cast<std::size_t>(std::ceil(static_cast<double>(NewBucketCount) * LoadFactor));
470 }
471
472 Lanes.beforeUnlockAllBut(H);
473 Lanes.unlockAllBut(H);
474 return true;
475 }
476};
477
478} // namespace souffle