OILS / vendor / souffle / utility / ParallelUtil.h View on Github | oils.pub

884 lines, 412 significant
1/*
2 * Souffle - A Datalog Compiler
3 * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved
4 * Licensed under the Universal Permissive License v 1.0 as shown at:
5 * - https://opensource.org/licenses/UPL
6 * - <souffle root>/licenses/SOUFFLE-UPL.txt
7 */
8
9/************************************************************************
10 *
11 * @file ParallelUtil.h
12 *
13 * A set of utilities abstracting from the underlying parallel library.
14 * Currently supported APIs: OpenMP and Cilk
15 *
16 ***********************************************************************/
17
18#pragma once
19
20#include <atomic>
21#include <cassert>
22#include <cstddef>
23#include <memory>
24#include <new>
25
26// https://bugs.llvm.org/show_bug.cgi?id=41423
27#if defined(__cpp_lib_hardware_interference_size) && (__cpp_lib_hardware_interference_size != 201703L)
28using std::hardware_constructive_interference_size;
29using std::hardware_destructive_interference_size;
30#else
31// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │
32// ...
33constexpr std::size_t hardware_constructive_interference_size = 2 * sizeof(max_align_t);
34constexpr std::size_t hardware_destructive_interference_size = 2 * sizeof(max_align_t);
35#endif
36
37#ifdef _OPENMP
38
39/**
40 * Implementation of parallel control flow constructs utilizing OpenMP
41 */
42
43#include <omp.h>
44
45#ifdef __APPLE__
46#define pthread_yield pthread_yield_np
47#elif !defined(_MSC_VER)
48#include <sched.h>
49// pthread_yield is deprecated and should be replaced by sched_yield
50#define pthread_yield sched_yield
51#elif defined _MSC_VER
52#include <thread>
53#define NOMINMAX
54#include <windows.h>
55#define pthread_yield std::this_thread::yield
56#endif
57
58#ifdef _MSC_VER
59// support for a parallel region
60#define PARALLEL_START __pragma(omp parallel) {
61#define PARALLEL_END }
62
63// support for parallel loops
64#define pfor __pragma(omp for schedule(dynamic)) for
65#else
66// support for a parallel region
67#define PARALLEL_START _Pragma("omp parallel") {
68#define PARALLEL_END }
69
70// support for parallel loops
71#define pfor _Pragma("omp for schedule(dynamic)") for
72#endif
73
74// spawn and sync are processed sequentially (overhead to expensive)
75#define task_spawn
76#define task_sync
77
78// section start / end => corresponding OpenMP pragmas
79// NOTE: disabled since it causes performance losses
80//#define SECTIONS_START _Pragma("omp parallel sections") {
81// NOTE: we stick to flat-level parallelism since it is faster due to thread pooling
82#define SECTIONS_START {
83#define SECTIONS_END }
84
85// the markers for a single section
86//#define SECTION_START _Pragma("omp section") {
87#define SECTION_START {
88#define SECTION_END }
89
90// a macro to create an operation context
91#define CREATE_OP_CONTEXT(NAME, INIT) [[maybe_unused]] auto NAME = INIT;
92#define READ_OP_CONTEXT(NAME) NAME
93
94#else
95
96// support for a parallel region => sequential execution
97#define PARALLEL_START {
98#define PARALLEL_END }
99
100// support for parallel loops => simple sequential loop
101#define pfor for
102
103// spawn and sync not supported
104#define task_spawn
105#define task_sync
106
107// sections are processed sequentially
108#define SECTIONS_START {
109#define SECTIONS_END }
110
111// sections are inlined
112#define SECTION_START {
113#define SECTION_END }
114
115// a macro to create an operation context
116#define CREATE_OP_CONTEXT(NAME, INIT) [[maybe_unused]] auto NAME = INIT;
117#define READ_OP_CONTEXT(NAME) NAME
118
119// mark es sequential
120#define IS_SEQUENTIAL
121
122#endif
123
124#ifndef IS_SEQUENTIAL
125#define IS_PARALLEL
126#endif
127
128#ifdef IS_PARALLEL
129#include <mutex>
130#include <vector>
131#define MAX_THREADS (omp_get_max_threads())
132#else
133#define MAX_THREADS (1)
134#endif
135
136namespace souffle {
137
138struct SeqConcurrentLanes {
139 struct TrivialLock {
140 ~TrivialLock() {}
141 };
142
143 using lane_id = std::size_t;
144 using unique_lock_type = TrivialLock;
145
146 explicit SeqConcurrentLanes(std::size_t = 1) {}
147 SeqConcurrentLanes(const SeqConcurrentLanes&) = delete;
148 SeqConcurrentLanes(SeqConcurrentLanes&&) = delete;
149
150 virtual ~SeqConcurrentLanes() {}
151
152 std::size_t lanes() const {
153 return 1;
154 }
155
156 void setNumLanes(const std::size_t) {}
157
158 unique_lock_type guard(const lane_id) const {
159 return TrivialLock();
160 }
161
162 void lock(const lane_id) const {
163 return;
164 }
165
166 void unlock(const lane_id) const {
167 return;
168 }
169
170 void beforeLockAllBut(const lane_id) const {
171 return;
172 }
173
174 void beforeUnlockAllBut(const lane_id) const {
175 return;
176 }
177
178 void lockAllBut(const lane_id) const {
179 return;
180 }
181
182 void unlockAllBut(const lane_id) const {
183 return;
184 }
185};
186
187#ifdef IS_PARALLEL
188
189/**
190 * A small utility class for implementing simple locks.
191 */
192class Lock {
193 // the underlying mutex
194 std::mutex mux;
195
196public:
197 struct Lease {
198 Lease(std::mutex& mux) : mux(&mux) {
199 mux.lock();
200 }
201 Lease(Lease&& other) : mux(other.mux) {
202 other.mux = nullptr;
203 }
204 Lease(const Lease& other) = delete;
205 ~Lease() {
206 if (mux != nullptr) {
207 mux->unlock();
208 }
209 }
210
211 protected:
212 std::mutex* mux;
213 };
214
215 // acquired the lock for the live-cycle of the returned guard
216 Lease acquire() {
217 return Lease(mux);
218 }
219
220 void lock() {
221 mux.lock();
222 }
223
224 bool try_lock() {
225 return mux.try_lock();
226 }
227
228 void unlock() {
229 mux.unlock();
230 }
231};
232
233// /* valuable source: http://locklessinc.com/articles/locks/ */
234
235namespace detail {
236
237/* Pause instruction to prevent excess processor bus usage */
238#if defined _MSC_VER
239#define cpu_relax() YieldProcessor()
240#else
241#ifdef __x86_64__
242#define cpu_relax() asm volatile("pause\n" : : : "memory")
243#else
244#define cpu_relax() asm volatile("" : : : "memory")
245#endif
246#endif
247
248/**
249 * A utility class managing waiting operations for spin locks.
250 */
251class Waiter {
252 int i = 0;
253
254public:
255 Waiter() = default;
256
257 /**
258 * Conducts a wait operation.
259 */
260 void operator()() {
261 ++i;
262 if ((i % 1000) == 0) {
263 // there was no progress => let others work
264 pthread_yield();
265 } else {
266 // relax this CPU
267 cpu_relax();
268 }
269 }
270};
271} // namespace detail
272
273/* compare: http://en.cppreference.com/w/cpp/atomic/atomic_flag */
274class SpinLock {
275 std::atomic<int> lck{0};
276
277public:
278 SpinLock() = default;
279
280 void lock() {
281 detail::Waiter wait;
282 while (!try_lock()) {
283 wait();
284 }
285 }
286
287 bool try_lock() {
288 int should = 0;
289 return lck.compare_exchange_weak(should, 1, std::memory_order_acquire);
290 }
291
292 void unlock() {
293 lck.store(0, std::memory_order_release);
294 }
295};
296
297/**
298 * A read/write lock for increased access performance on a
299 * read-heavy use case.
300 */
301class ReadWriteLock {
302 /**
303 * Based on paper:
304 * Scalable Reader-Writer Synchronization
305 * for Shared-Memory Multiprocessors
306 *
307 * Layout of the lock:
308 * 31 ... 2 1 0
309 * +-------------------------+--------------------+--------------------+
310 * | interested reader count | waiting writer | active writer flag |
311 * +-------------------------+--------------------+--------------------+
312 */
313
314 std::atomic<int> lck{0};
315
316public:
317 ReadWriteLock() = default;
318
319 void start_read() {
320 // add reader
321 auto r = lck.fetch_add(4, std::memory_order_acquire);
322
323 // wait until there is no writer any more
324 detail::Waiter wait;
325 while (r & 0x3) {
326 // release reader
327 end_read();
328
329 // wait a bit
330 wait();
331
332 // apply as a reader again
333 r = lck.fetch_add(4, std::memory_order_acquire);
334
335 } // while there is a writer => spin
336 }
337
338 void end_read() {
339 lck.fetch_sub(4, std::memory_order_release);
340 }
341
342 void start_write() {
343 detail::Waiter wait;
344
345 // set wait-for-write bit
346 auto stat = lck.fetch_or(2, std::memory_order_acquire);
347 while (stat & 0x2) {
348 wait();
349 stat = lck.fetch_or(2, std::memory_order_acquire);
350 }
351
352 // the caller may starve here ...
353 int should = 2;
354 while (!lck.compare_exchange_strong(
355 should, 1, std::memory_order_acquire, std::memory_order_relaxed)) {
356 wait();
357 should = 2;
358 }
359 }
360
361 bool try_write() {
362 int should = 0;
363 return lck.compare_exchange_strong(should, 1, std::memory_order_acquire, std::memory_order_relaxed);
364 }
365
366 void end_write() {
367 lck.fetch_sub(1, std::memory_order_release);
368 }
369
370 bool try_upgrade_to_write() {
371 int should = 4;
372 return lck.compare_exchange_strong(should, 1, std::memory_order_acquire, std::memory_order_relaxed);
373 }
374
375 void downgrade_to_read() {
376 // delete write bit + set num readers to 1
377 lck.fetch_add(3, std::memory_order_release);
378 }
379};
380
381/**
382 * An implementation of an optimistic r/w lock.
383 */
384class OptimisticReadWriteLock {
385 /**
386 * The version number utilized for the synchronization.
387 *
388 * Usage:
389 * - even version numbers are stable versions, not being updated
390 * - odd version numbers are temporary versions, currently being updated
391 */
392 std::atomic<int> version{0};
393
394public:
395 /**
396 * The lease utilized to link start and end of read phases.
397 */
398 class Lease {
399 friend class OptimisticReadWriteLock;
400 int version;
401
402 public:
403 Lease(int version = 0) : version(version) {}
404 Lease(const Lease& lease) = default;
405 Lease& operator=(const Lease& other) = default;
406 Lease& operator=(Lease&& other) = default;
407 };
408
409 /**
410 * A default constructor initializing the lock.
411 */
412 OptimisticReadWriteLock() = default;
413
414 /**
415 * Starts a read phase, making sure that there is currently no
416 * active concurrent modification going on. The resulting lease
417 * enables the invoking process to later-on verify that no
418 * concurrent modifications took place.
419 */
420 Lease start_read() {
421 detail::Waiter wait;
422
423 // get a snapshot of the lease version
424 auto v = version.load(std::memory_order_acquire);
425
426 // spin while there is a write in progress
427 while ((v & 0x1) == 1) {
428 // wait for a moment
429 wait();
430 // get an updated version
431 v = version.load(std::memory_order_acquire);
432 }
433
434 // done
435 return Lease(v);
436 }
437
438 /**
439 * Tests whether there have been concurrent modifications since
440 * the given lease has been issued.
441 *
442 * @return true if no updates have been conducted, false otherwise
443 */
444 bool validate(const Lease& lease) {
445 // check whether version number has changed in the mean-while
446 std::atomic_thread_fence(std::memory_order_acquire);
447 return lease.version == version.load(std::memory_order_relaxed);
448 }
449
450 /**
451 * Ends a read phase by validating the given lease.
452 *
453 * @return true if no updates have been conducted since the
454 * issuing of the lease, false otherwise
455 */
456 bool end_read(const Lease& lease) {
457 // check lease in the end
458 return validate(lease);
459 }
460
461 /**
462 * Starts a write phase on this lock be ensuring exclusive access
463 * and invalidating any existing read lease.
464 */
465 void start_write() {
466 detail::Waiter wait;
467
468 // set last bit => make it odd
469 auto v = version.fetch_or(0x1, std::memory_order_acquire);
470
471 // check for concurrent writes
472 while ((v & 0x1) == 1) {
473 // wait for a moment
474 wait();
475 // get an updated version
476 v = version.fetch_or(0x1, std::memory_order_acquire);
477 }
478
479 // done
480 }
481
482 /**
483 * Tries to start a write phase unless there is a currently ongoing
484 * write operation. In this case no write permission will be obtained.
485 *
486 * @return true if write permission has been granted, false otherwise.
487 */
488 bool try_start_write() {
489 auto v = version.fetch_or(0x1, std::memory_order_acquire);
490 return !(v & 0x1);
491 }
492
493 /**
494 * Updates a read-lease to a write permission by a) validating that the
495 * given lease is still valid and b) making sure that there is no currently
496 * ongoing write operation.
497 *
498 * @return true if the lease was still valid and write permissions could
499 * be granted, false otherwise.
500 */
501 bool try_upgrade_to_write(const Lease& lease) {
502 auto v = version.fetch_or(0x1, std::memory_order_acquire);
503
504 // check whether write privileges have been gained
505 if (v & 0x1) return false; // there is another writer already
506
507 // check whether there was no write since the gain of the read lock
508 if (lease.version == v) return true;
509
510 // if there was, undo write update
511 abort_write();
512
513 // operation failed
514 return false;
515 }
516
517 /**
518 * Aborts a write operation by reverting to the version number before
519 * starting the ongoing write, thereby re-validating existing leases.
520 */
521 void abort_write() {
522 // reset version number
523 version.fetch_sub(1, std::memory_order_release);
524 }
525
526 /**
527 * Ends a write operation by giving up the associated exclusive access
528 * to the protected data and abandoning the provided write permission.
529 */
530 void end_write() {
531 // update version number another time
532 version.fetch_add(1, std::memory_order_release);
533 }
534
535 /**
536 * Tests whether currently write permissions have been granted to any
537 * client by this lock.
538 *
539 * @return true if so, false otherwise
540 */
541 bool is_write_locked() const {
542 return version & 0x1;
543 }
544};
545
546/** Concurrent lanes locking mechanism. */
547struct MutexConcurrentLanes {
548 using lane_id = std::size_t;
549 using unique_lock_type = std::unique_lock<std::mutex>;
550
551 explicit MutexConcurrentLanes(const std::size_t Sz) : Size(Sz), Attribution(attribution(Sz)) {
552 Lanes = std::make_unique<Lane[]>(Sz);
553 }
554 MutexConcurrentLanes(const MutexConcurrentLanes&) = delete;
555 MutexConcurrentLanes(MutexConcurrentLanes&&) = delete;
556
557 virtual ~MutexConcurrentLanes() {}
558
559 // Return the number of lanes.
560 std::size_t lanes() const {
561 return Size;
562 }
563
564 // Select a lane
565 lane_id getLane(std::size_t I) const {
566 if (Attribution == lane_attribution::mod_power_of_2) {
567 return I & (Size - 1);
568 } else {
569 return I % Size;
570 }
571 }
572
573 /** Change the number of lanes.
574 * DO not use while threads are using this object.
575 */
576 void setNumLanes(const std::size_t NumLanes) {
577 Size = (NumLanes == 0 ? 1 : NumLanes);
578 Attribution = attribution(Size);
579 Lanes = std::make_unique<Lane[]>(Size);
580 }
581
582 unique_lock_type guard(const lane_id Lane) const {
583 return unique_lock_type(Lanes[Lane].Access);
584 }
585
586 // Lock the given lane.
587 // Must eventually be followed by unlock(Lane).
588 void lock(const lane_id Lane) const {
589 Lanes[Lane].Access.lock();
590 }
591
592 // Unlock the given lane.
593 // Must already be the owner of the lane's lock.
594 void unlock(const lane_id Lane) const {
595 Lanes[Lane].Access.unlock();
596 }
597
598 // Acquire the capability to lock all other lanes than the given one.
599 //
600 // Must eventually be followed by beforeUnlockAllBut(Lane).
601 void beforeLockAllBut(const lane_id Lane) const {
602 if (!BeforeLockAll.try_lock()) {
603 // If we cannot get the lock immediately, it means it was acquired
604 // concurrently by another lane that will also try to acquire our
605 // lane lock.
606 // So we release our lane lock to let the concurrent operation
607 // progress.
608 unlock(Lane);
609 BeforeLockAll.lock();
610 lock(Lane);
611 }
612 }
613
614 // Release the capability to lock all other lanes than the given one.
615 //
616 // Must already be the owner of that capability.
617 void beforeUnlockAllBut(const lane_id) const {
618 BeforeLockAll.unlock();
619 }
620
621 // Lock all lanes but the given one.
622 //
623 // Must already have acquired the capability to lock all other lanes
624 // by calling beforeLockAllBut(Lane).
625 //
626 // Must eventually be followed by unlockAllBut(Lane).
627 void lockAllBut(const lane_id Lane) const {
628 for (std::size_t I = 0; I < Size; ++I) {
629 if (I != Lane) {
630 Lanes[I].Access.lock();
631 }
632 }
633 }
634
635 // Unlock all lanes but the given one.
636 // Must already be the owner of all the lanes' locks.
637 void unlockAllBut(const lane_id Lane) const {
638 for (std::size_t I = 0; I < Size; ++I) {
639 if (I != Lane) {
640 Lanes[I].Access.unlock();
641 }
642 }
643 }
644
645private:
646 enum lane_attribution { mod_power_of_2, mod_other };
647
648 struct Lane {
649 alignas(hardware_destructive_interference_size) std::mutex Access;
650 };
651
652 static constexpr lane_attribution attribution(const std::size_t Sz) {
653 assert(Sz > 0);
654 if ((Sz & (Sz - 1)) == 0) {
655 // Sz is a power of 2
656 return lane_attribution::mod_power_of_2;
657 } else {
658 return lane_attribution::mod_other;
659 }
660 }
661
662protected:
663 std::size_t Size;
664 lane_attribution Attribution;
665
666private:
667 mutable std::unique_ptr<Lane[]> Lanes;
668
669 alignas(hardware_destructive_interference_size) mutable std::mutex BeforeLockAll;
670};
671
672class ConcurrentLanes : public MutexConcurrentLanes {
673 using Base = MutexConcurrentLanes;
674
675public:
676 using lane_id = Base::lane_id;
677 using Base::beforeLockAllBut;
678 using Base::beforeUnlockAllBut;
679 using Base::guard;
680 using Base::lock;
681 using Base::lockAllBut;
682 using Base::unlock;
683 using Base::unlockAllBut;
684
685 explicit ConcurrentLanes(const std::size_t Sz) : MutexConcurrentLanes(Sz) {}
686 ConcurrentLanes(const ConcurrentLanes&) = delete;
687 ConcurrentLanes(ConcurrentLanes&&) = delete;
688
689 lane_id threadLane() const {
690 return getLane(static_cast<std::size_t>(omp_get_thread_num()));
691 }
692
693 void setNumLanes(const std::size_t NumLanes) {
694 Base::setNumLanes(NumLanes == 0 ? omp_get_max_threads() : NumLanes);
695 }
696
697 unique_lock_type guard() const {
698 return Base::guard(threadLane());
699 }
700
701 void lock() const {
702 return Base::lock(threadLane());
703 }
704
705 void unlock() const {
706 return Base::unlock(threadLane());
707 }
708
709 void beforeLockAllBut() const {
710 return Base::beforeLockAllBut(threadLane());
711 }
712
713 void beforeUnlockAllBut() const {
714 return Base::beforeUnlockAllBut(threadLane());
715 }
716
717 void lockAllBut() const {
718 return Base::lockAllBut(threadLane());
719 }
720
721 void unlockAllBut() const {
722 return Base::unlockAllBut(threadLane());
723 }
724};
725
726#else
727
728/**
729 * A small utility class for implementing simple locks.
730 */
731struct Lock {
732 class Lease {};
733
734 // no locking if there is no parallel execution
735 Lease acquire() {
736 return Lease();
737 }
738
739 void lock() {}
740
741 bool try_lock() {
742 return true;
743 }
744
745 void unlock() {}
746};
747
748/**
749 * A 'sequential' non-locking implementation for a spin lock.
750 */
751class SpinLock {
752public:
753 SpinLock() = default;
754
755 void lock() {}
756
757 bool try_lock() {
758 return true;
759 }
760
761 void unlock() {}
762};
763
764class ReadWriteLock {
765public:
766 ReadWriteLock() = default;
767
768 void start_read() {}
769
770 void end_read() {}
771
772 void start_write() {}
773
774 bool try_write() {
775 return true;
776 }
777
778 void end_write() {}
779
780 bool try_upgrade_to_write() {
781 return true;
782 }
783
784 void downgrade_to_read() {}
785};
786
787/**
788 * A 'sequential' non-locking implementation for an optimistic r/w lock.
789 */
790class OptimisticReadWriteLock {
791public:
792 class Lease {};
793
794 OptimisticReadWriteLock() = default;
795
796 Lease start_read() {
797 return Lease();
798 }
799
800 bool validate(const Lease& /*lease*/) {
801 return true;
802 }
803
804 bool end_read(const Lease& /*lease*/) {
805 return true;
806 }
807
808 void start_write() {}
809
810 bool try_start_write() {
811 return true;
812 }
813
814 bool try_upgrade_to_write(const Lease& /*lease*/) {
815 return true;
816 }
817
818 void abort_write() {}
819
820 void end_write() {}
821
822 bool is_write_locked() const {
823 return true;
824 }
825};
826
827struct ConcurrentLanes : protected SeqConcurrentLanes {
828 using Base = SeqConcurrentLanes;
829 using lane_id = SeqConcurrentLanes::lane_id;
830 using unique_lock_type = SeqConcurrentLanes::unique_lock_type;
831
832 using Base::lanes;
833 using Base::setNumLanes;
834
835 explicit ConcurrentLanes(std::size_t Sz = MAX_THREADS) : Base(Sz) {}
836 ConcurrentLanes(const ConcurrentLanes&) = delete;
837 ConcurrentLanes(ConcurrentLanes&&) = delete;
838
839 virtual ~ConcurrentLanes() {}
840
841 lane_id threadLane() const {
842 return 0;
843 }
844
845 unique_lock_type guard() const {
846 return Base::guard(threadLane());
847 }
848
849 void lock() const {
850 return Base::lock(threadLane());
851 }
852
853 void unlock() const {
854 return Base::unlock(threadLane());
855 }
856
857 void beforeLockAllBut() const {
858 return Base::beforeLockAllBut(threadLane());
859 }
860
861 void beforeUnlockAllBut() const {
862 return Base::beforeUnlockAllBut(threadLane());
863 }
864
865 void lockAllBut() const {
866 return Base::lockAllBut(threadLane());
867 }
868
869 void unlockAllBut() const {
870 return Base::unlockAllBut(threadLane());
871 }
872};
873
874#endif
875
876/**
877 * Obtains a reference to the lock synchronizing output operations.
878 */
879inline Lock& getOutputLock() {
880 static Lock outputLock;
881 return outputLock;
882}
883
884} // namespace souffle