OILS / vendor / souffle / datastructure / ConcurrentFlyweight.h View on Github | oils.pub

527 lines, 287 significant
1/*
2 * Souffle - A Datalog Compiler
3 * Copyright (c) 2021, The Souffle Developers. All rights reserved
4 * Licensed under the Universal Permissive License v 1.0 as shown at:
5 * - https://opensource.org/licenses/UPL
6 * - <souffle root>/licenses/SOUFFLE-UPL.txt
7 */
8#pragma once
9
10#include "ConcurrentInsertOnlyHashMap.h"
11#include "souffle/utility/ParallelUtil.h"
12#include <cassert>
13#include <cstring>
14
15namespace souffle {
16
17/**
18 * A concurrent, almost lock-free associative datastructure that implements the
19 * Flyweight pattern. Assigns a unique index to each inserted key. Elements
20 * cannot be removed, the datastructure can only grow.
21 *
22 * The datastructure enables a configurable number of concurrent access lanes.
23 * Access to the datastructure is lock-free between different lanes.
24 * Concurrent accesses through the same lane is sequential.
25 *
26 * Growing the datastructure requires to temporarily lock all lanes to let a
27 * single lane perform the growing operation. The global lock is amortized
28 * thanks to an exponential growth strategy.
29 *
30 */
31template <class LanesPolicy, class Key, class Hash = std::hash<Key>, class KeyEqual = std::equal_to<Key>,
32 class KeyFactory = details::Factory<Key>>
33class ConcurrentFlyweight {
34public:
35 using lane_id = typename LanesPolicy::lane_id;
36 using index_type = std::size_t;
37 using key_type = Key;
38 using value_type = std::pair<const Key, const index_type>;
39 using pointer = const value_type*;
40 using reference = const value_type&;
41
42private:
43 // Effectively:
44 // data slot_type = NONE | END | Idx index_type
45 // The last two values in the domain of `index_type` are used to represent cases `NONE` and `END`
46 // TODO: strong type-def wrap this to prevent implicit conversions
47 using slot_type = index_type;
48 static constexpr slot_type NONE = std::numeric_limits<slot_type>::max(); // special case: `std::nullopt`
49 static constexpr slot_type END = NONE - 1; // special case: end iterator
50 static constexpr slot_type SLOT_MAX = END; // +1 the largest non-special slot value
51
52 static_assert(std::is_same_v<slot_type, index_type>,
53 "conversion helpers assume they're the underlying type, "
54 "with the last two values reserved for special cases");
55 static_assert(std::is_unsigned_v<slot_type>);
56
57 /// Converts from index to slot.
58 static slot_type slot(const index_type I) {
59 // not expected to happen. you'll run out of memory long before.
60 assert(I < SLOT_MAX && "can't represent index in `slot_type` domain");
61 return static_cast<slot_type>(I);
62 }
63
64 /// Converts from slot to index.
65 static index_type index(const slot_type S) {
66 assert(S < SLOT_MAX && "slot is sentinal value; can't convert to index !!");
67 return static_cast<index_type>(S);
68 }
69
70public:
71 /// Iterator with concurrent access to the datastructure.
72 struct Iterator {
73 using iterator_category = std::input_iterator_tag;
74 using value_type = ConcurrentFlyweight::value_type;
75 using pointer = ConcurrentFlyweight::pointer;
76 using reference = ConcurrentFlyweight::reference;
77
78 private:
79 const ConcurrentFlyweight* This;
80
81 /// Access lane to the datastructure.
82 lane_id Lane;
83
84 /// Current slot.
85 slot_type Slot;
86
87 /// Next slot that might be unassigned.
88 slot_type NextMaybeUnassignedSlot;
89
90 /// Handle that owns the next slot that might be unassigned.
91 slot_type NextMaybeUnassignedHandle = NONE;
92
93 public:
94 // The 'begin' iterator
95 Iterator(const ConcurrentFlyweight* This, const lane_id H)
96 : This(This), Lane(H), Slot(NONE), NextMaybeUnassignedSlot(0) {
97 FindNextMaybeUnassignedSlot();
98 MoveToNextAssignedSlot();
99 }
100
101 // The 'end' iterator
102 Iterator(const ConcurrentFlyweight* This)
103 : This(This), Lane(0), Slot(END), NextMaybeUnassignedSlot(END) {}
104
105 // The iterator starting at slot I, using access lane H.
106 Iterator(const ConcurrentFlyweight* This, const lane_id H, const index_type I)
107 : This(This), Lane(H), Slot(slot(I)), NextMaybeUnassignedSlot(slot(I)) {
108 FindNextMaybeUnassignedSlot();
109 MoveToNextAssignedSlot();
110 }
111
112 Iterator(const Iterator& That)
113 : This(That.This), Lane(That.Lane), Slot(That.Slot),
114 NextMaybeUnassignedSlot(That.NextMaybeUnassignedSlot),
115 NextMaybeUnassignedHandle(That.NextMaybeUnassignedHandle) {}
116
117 Iterator(Iterator&& That)
118 : This(That.This), Lane(That.Lane), Slot(That.Slot),
119 NextMaybeUnassignedSlot(That.NextMaybeUnassignedSlot),
120 NextMaybeUnassignedHandle(That.NextMaybeUnassignedHandle) {}
121
122 Iterator& operator=(const Iterator& That) {
123 This = That.This;
124 Lane = That.Lane;
125 Slot = That.Slot;
126 NextMaybeUnassignedSlot = That.NextMaybeUnassignedSlot;
127 NextMaybeUnassignedHandle = That.NextMaybeUnassignedHandle;
128 }
129
130 Iterator& operator=(Iterator&& That) {
131 This = That.This;
132 Lane = That.Lane;
133 Slot = That.Slot;
134 NextMaybeUnassignedSlot = That.NextMaybeUnassignedSlot;
135 NextMaybeUnassignedHandle = That.NextMaybeUnassignedHandle;
136 }
137
138 reference operator*() const {
139 const auto Guard = This->Lanes.guard(Lane);
140 return *This->Slots[index(Slot)];
141 }
142
143 pointer operator->() const {
144 const auto Guard = This->Lanes.guard(Lane);
145 return This->Slots[index(Slot)];
146 }
147
148 Iterator& operator++() {
149 MoveToNextAssignedSlot();
150 return *this;
151 }
152
153 Iterator operator++(int) {
154 Iterator Tmp = *this;
155 ++(*this);
156 return Tmp;
157 }
158
159 bool operator==(const Iterator& That) const {
160 return (This == That.This) && (Slot == That.Slot);
161 }
162
163 bool operator!=(const Iterator& That) const {
164 return (This != That.This) || (Slot != That.Slot);
165 }
166
167 private:
168 /** Find next slot after Slot that is maybe unassigned. */
169 void FindNextMaybeUnassignedSlot() {
170 NextMaybeUnassignedSlot = END;
171 for (lane_id I = 0; I < This->Lanes.lanes(); ++I) {
172 const auto Lane = This->Lanes.guard(I);
173 if ((Slot == NONE || This->Handles[I].NextSlot > Slot) &&
174 This->Handles[I].NextSlot < NextMaybeUnassignedSlot) {
175 NextMaybeUnassignedSlot = This->Handles[I].NextSlot;
176 NextMaybeUnassignedHandle = I;
177 }
178 }
179 if (NextMaybeUnassignedSlot == END) {
180 NextMaybeUnassignedSlot = This->NextSlot.load(std::memory_order_acquire);
181 NextMaybeUnassignedHandle = NONE;
182 }
183 }
184
185 /**
186 * Move Slot to next assigned slot and return true.
187 * Otherwise the end is reached and Slot is assigned `END` and return false.
188 */
189 bool MoveToNextAssignedSlot() {
190 static_assert(NONE == std::numeric_limits<slot_type>::max(),
191 "required for wrap around to 0 for begin-iterator-scan");
192 static_assert(NONE + 1 == 0, "required for wrap around to 0 for begin-iterator-scan");
193 while (Slot != END) {
194 assert(Slot + 1 < SLOT_MAX);
195 if (Slot + 1 < NextMaybeUnassignedSlot) { // next unassigned slot not reached
196 Slot = Slot + 1;
197 return true;
198 }
199
200 if (NextMaybeUnassignedHandle == NONE) { // reaching end
201 Slot = END;
202 NextMaybeUnassignedSlot = END;
203 NextMaybeUnassignedHandle = NONE;
204 return false;
205 }
206
207 if (NextMaybeUnassignedHandle != NONE) { // maybe reaching the next unassigned slot
208 This->Lanes.lock(NextMaybeUnassignedHandle);
209 const bool IsAssigned = (Slot + 1 < This->Handles[NextMaybeUnassignedHandle].NextSlot);
210 This->Lanes.unlock(NextMaybeUnassignedHandle);
211 Slot = Slot + 1;
212 FindNextMaybeUnassignedSlot();
213 if (IsAssigned) {
214 return true;
215 }
216 }
217 }
218 return false;
219 }
220 };
221
222 using iterator = Iterator;
223
224 /// Initialize the datastructure with the given capacity.
225 ConcurrentFlyweight(const std::size_t LaneCount, const std::size_t InitialCapacity,
226 const bool ReserveFirst, const Hash& hash = Hash(), const KeyEqual& key_equal = KeyEqual(),
227 const KeyFactory& key_factory = KeyFactory())
228 : Lanes(LaneCount), HandleCount(LaneCount),
229 Mapping(LaneCount, InitialCapacity, hash, key_equal, key_factory) {
230 Slots = std::make_unique<const value_type*[]>(InitialCapacity);
231 Handles = std::make_unique<Handle[]>(HandleCount);
232 NextSlot = (ReserveFirst ? 1 : 0);
233 SlotCount = InitialCapacity;
234 }
235
236 /// Initialize the datastructure with a capacity of 8 elements.
237 ConcurrentFlyweight(const std::size_t LaneCount, const bool ReserveFirst, const Hash& hash = Hash(),
238 const KeyEqual& key_equal = KeyEqual(), const KeyFactory& key_factory = KeyFactory())
239
240 : ConcurrentFlyweight(LaneCount, 8, ReserveFirst, hash, key_equal, key_factory) {}
241
242 /// Initialize the datastructure with a capacity of 8 elements.
243 ConcurrentFlyweight(const std::size_t LaneCount, const Hash& hash = Hash(),
244 const KeyEqual& key_equal = KeyEqual(), const KeyFactory& key_factory = KeyFactory())
245 : ConcurrentFlyweight(LaneCount, 8, false, hash, key_equal, key_factory) {}
246
247 virtual ~ConcurrentFlyweight() {
248 for (lane_id I = 0; I < HandleCount; ++I) {
249 if (Handles[I].NextNode) {
250 delete Handles[I].NextNode;
251 }
252 }
253 }
254
255 /**
256 * Change the number of lanes and possibly grow the number of handles.
257 * Do not use while threads are using this datastructure.
258 */
259 void setNumLanes(const std::size_t NumLanes) {
260 if (NumLanes > HandleCount) {
261 std::unique_ptr<Handle[]> NextHandles = std::make_unique<Handle[]>(NumLanes);
262 std::copy(Handles.get(), Handles.get() + HandleCount, NextHandles.get());
263 Handles.swap(NextHandles);
264 HandleCount = NumLanes;
265 }
266 Mapping.setNumLanes(NumLanes);
267 Lanes.setNumLanes(NumLanes);
268 }
269
270 /** Return a concurrent iterator on the first element. */
271 Iterator begin(const lane_id H) const {
272 return Iterator(this, H);
273 }
274
275 /** Return an iterator past the last element. */
276 Iterator end() const {
277 return Iterator(this);
278 }
279
280 /// Return true if the value is in the map.
281 template <typename K>
282 bool weakContains(const lane_id H, const K& X) const {
283 return Mapping.weakContains(H, X);
284 }
285
286 /// Return the value associated with the given index.
287 /// Assumption: the index is mapped in the datastructure.
288 const Key& fetch(const lane_id H, const index_type Idx) const {
289 const auto Lane = Lanes.guard(H);
290 assert(Idx < SlotCount.load(std::memory_order_relaxed));
291 return Slots[Idx]->first;
292 }
293
294 /// Return the pair of the index for the given value and a boolean
295 /// indicating if the value was already present (false) or inserted by this handle (true).
296 /// Insert the value and return a fresh index if the value is not
297 /// yet indexed.
298 template <class... Args>
299 std::pair<index_type, bool> findOrInsert(const lane_id H, Args&&... Xs) {
300 const auto Lane = Lanes.guard(H);
301 node_type Node;
302
303 slot_type Slot = Handles[H].NextSlot;
304
305 // Getting the next insertion slot for the current lane may require
306 // more than one attempts if the datastructure must grow and other
307 // threads are waiting for the same lane @p H.
308 while (true) {
309 if (Slot == NONE) {
310 // Reserve a slot for the lane, the datastructure might need to
311 // grow before the slot memory location becomes available.
312 Slot = NextSlot++;
313 Handles[H].NextSlot = Slot;
314 Handles[H].NextNode = Mapping.node(static_cast<index_type>(Slot));
315 }
316
317 if (Slot >= SlotCount.load(std::memory_order_relaxed)) {
318 // The slot memory location is not yet available, try to
319 // grow the datastructure. Other threads in other lanes might
320 // be attempting to grow the datastructure concurrently.
321 //
322 // Anyway when this call returns the Slot memory location is
323 // available.
324 tryGrow(H);
325
326 // Reload the Slot for the current lane since another thread
327 // using the same lane may take-over the lane during tryGrow()
328 // and consume the slot before the current thread is
329 // rescheduled on the lane.
330 Slot = Handles[H].NextSlot;
331 } else {
332 // From here the slot is known, allocated and available.
333 break;
334 }
335 }
336
337 Node = Handles[H].NextNode;
338
339 // Insert key in the index in advance.
340 Slots[Slot] = &Node->value();
341
342 auto Res = Mapping.get(H, Node, std::forward<Args>(Xs)...);
343 if (Res.second) {
344 // Inserted by self, slot is consumed, clear the lane's state.
345 Handles[H].clear();
346 return std::make_pair(static_cast<index_type>(Slot), true);
347 } else {
348 // Inserted concurrently by another thread, clearing the slot is
349 // not strictly needed but it avoids leaving a dangling pointer
350 // there.
351 //
352 // The reserved slot and node remains in the lane state so that
353 // they can be consumed by the next insertion operation on this
354 // lane.
355 Slots[Slot] = nullptr;
356 return std::make_pair(Res.first->second, false);
357 }
358 }
359
360private:
361 using map_type = ConcurrentInsertOnlyHashMap<LanesPolicy, Key, index_type, Hash, KeyEqual, KeyFactory>;
362 using node_type = typename map_type::node_type;
363
364 struct Handle {
365 void clear() {
366 NextSlot = NONE;
367 NextNode = nullptr;
368 }
369
370 slot_type NextSlot = NONE;
371 node_type NextNode = nullptr;
372 };
373
374protected:
375 // The concurrency manager.
376 LanesPolicy Lanes;
377
378private:
379 // Number of handles
380 std::size_t HandleCount;
381
382 // Handle for each concurrent lane.
383 std::unique_ptr<Handle[]> Handles;
384
385 // Slots[I] points to the value associated with index I.
386 std::unique_ptr<const value_type*[]> Slots;
387
388 // The map from keys to index.
389 map_type Mapping;
390
391 // Next available slot.
392 std::atomic<slot_type> NextSlot;
393
394 // Number of slots.
395 std::atomic<slot_type> SlotCount;
396
397 /// Grow the datastructure if needed.
398 bool tryGrow(const lane_id H) {
399 // This call may release and re-acquire the lane to
400 // allow progress of a concurrent growing operation.
401 //
402 // It is possible that another thread is waiting to
403 // enter the same lane, and that other thread might
404 // take and leave the lane before the current thread
405 // re-acquires it.
406 Lanes.beforeLockAllBut(H);
407
408 if (NextSlot < SlotCount) {
409 // Current size is fine
410 Lanes.beforeUnlockAllBut(H);
411 return false;
412 }
413
414 Lanes.lockAllBut(H);
415
416 { // safe section
417 const std::size_t CurrentSize = SlotCount;
418 std::size_t NewSize = (CurrentSize << 1); // double size policy
419 while (NewSize < NextSlot) {
420 NewSize <<= 1; // double size
421 }
422 std::unique_ptr<const value_type*[]> NewSlots = std::make_unique<const value_type*[]>(NewSize);
423 std::memcpy(NewSlots.get(), Slots.get(), sizeof(const value_type*) * CurrentSize);
424 Slots = std::move(NewSlots);
425 SlotCount = NewSize;
426 }
427
428 Lanes.beforeUnlockAllBut(H);
429 Lanes.unlockAllBut(H);
430
431 return true;
432 }
433};
434
435#ifdef _OPENMP
436/** A Flyweight datastructure with concurrent access specialized for OpenMP. */
437template <class Key, class Hash = std::hash<Key>, class KeyEqual = std::equal_to<Key>,
438 class KeyFactory = details::Factory<Key>>
439class OmpFlyweight : protected ConcurrentFlyweight<ConcurrentLanes, Key, Hash, KeyEqual, KeyFactory> {
440public:
441 using Base = ConcurrentFlyweight<ConcurrentLanes, Key, Hash, KeyEqual, KeyFactory>;
442 using index_type = typename Base::index_type;
443 using lane_id = typename Base::lane_id;
444 using iterator = typename Base::iterator;
445
446 explicit OmpFlyweight(const std::size_t LaneCount, const std::size_t InitialCapacity = 8,
447 const bool ReserveFirst = false, const Hash& hash = Hash(),
448 const KeyEqual& key_equal = KeyEqual(), const KeyFactory& key_factory = KeyFactory())
449 : Base(LaneCount, InitialCapacity, ReserveFirst, hash, key_equal, key_factory) {}
450
451 iterator begin() const {
452 return Base::begin(Base::Lanes.threadLane());
453 }
454
455 iterator end() const {
456 return Base::end();
457 }
458
459 template <typename K>
460 bool weakContains(const K& X) const {
461 return Base::weakContains(Base::Lanes.threadLane(), X);
462 }
463
464 const Key& fetch(const index_type Idx) const {
465 return Base::fetch(Base::Lanes.threadLane(), Idx);
466 }
467
468 template <class... Args>
469 std::pair<index_type, bool> findOrInsert(Args&&... Xs) {
470 return Base::findOrInsert(Base::Lanes.threadLane(), std::forward<Args>(Xs)...);
471 }
472};
473#endif
474
475/**
476 * A Flyweight datastructure with sequential access.
477 *
478 * Reuse the concurrent flyweight with a single access handle.
479 */
480template <class Key, class Hash = std::hash<Key>, class KeyEqual = std::equal_to<Key>,
481 class KeyFactory = details::Factory<Key>>
482class SeqFlyweight : protected ConcurrentFlyweight<SeqConcurrentLanes, Key, Hash, KeyEqual, KeyFactory> {
483public:
484 using Base = ConcurrentFlyweight<SeqConcurrentLanes, Key, Hash, KeyEqual, KeyFactory>;
485 using index_type = typename Base::index_type;
486 using lane_id = typename Base::lane_id;
487 using iterator = typename Base::iterator;
488
489 explicit SeqFlyweight(const std::size_t NumLanes, const std::size_t InitialCapacity = 8,
490 const bool ReserveFirst = false, const Hash& hash = Hash(),
491 const KeyEqual& key_equal = KeyEqual(), const KeyFactory& key_factory = KeyFactory())
492 : Base(NumLanes, InitialCapacity, ReserveFirst, hash, key_equal, key_factory) {}
493
494 iterator begin() const {
495 return Base::begin(0);
496 }
497
498 iterator end() const {
499 return Base::end();
500 }
501
502 template <typename K>
503 bool weakContains(const K& X) const {
504 return Base::weakContains(0, X);
505 }
506
507 const Key& fetch(const index_type Idx) const {
508 return Base::fetch(0, Idx);
509 }
510
511 template <class... Args>
512 std::pair<index_type, bool> findOrInsert(Args&&... Xs) {
513 return Base::findOrInsert(0, std::forward<Args>(Xs)...);
514 }
515};
516
517#ifdef _OPENMP
518template <class Key, class Hash = std::hash<Key>, class KeyEqual = std::equal_to<Key>,
519 class KeyFactory = details::Factory<Key>>
520using FlyweightImpl = OmpFlyweight<Key, Hash, KeyEqual, KeyFactory>;
521#else
522template <class Key, class Hash = std::hash<Key>, class KeyEqual = std::equal_to<Key>,
523 class KeyFactory = details::Factory<Key>>
524using FlyweightImpl = SeqFlyweight<Key, Hash, KeyEqual, KeyFactory>;
525#endif
526
527} // namespace souffle