|
| 1 | +// Distributed under the MIT License. |
| 2 | +// See LICENSE.txt for details. |
| 3 | + |
| 4 | +#pragma once |
| 5 | + |
| 6 | +#include <atomic> |
| 7 | +#include <cstddef> |
| 8 | +#include <cstdint> |
| 9 | +#include <new> // for hardware_destructive_interference_size |
| 10 | +#include <optional> |
| 11 | +#include <ostream> |
| 12 | +#include <type_traits> |
| 13 | +#include <vector> |
| 14 | + |
| 15 | +#include "Utilities/ErrorHandling/Error.hpp" |
| 16 | +#include "Utilities/ForceInline.hpp" |
| 17 | + |
| 18 | +namespace Parallel { |
| 19 | +/*! |
| 20 | + * \brief A lockfree multi-producer multi-consumer unordered set. |
| 21 | + * |
| 22 | + * An unordered set where keys of type `T` are stored directly (i.e. not |
| 23 | + * hashed). As a result, there is the constraint that |
| 24 | + * `sizeof(T) <= sizeof(std::uint64_t)`, i.e. the key type `T` must be 8 bytes |
| 25 | + * or less. |
| 26 | + * |
| 27 | + * In order to provide fully lockfree semantics, a capacity must chosen at |
| 28 | + * construction. If the capacity is reached, no more keys can be inserted. For |
| 29 | + * improved performance, the capacity must be a positive power of 2, |
| 30 | + * i.e. `2^N` for `N>-1`. |
| 31 | + * |
| 32 | + * Ideally the values of `T` are reasonably well distributed so as to avoid |
| 33 | + * collisions, but collisions are supported by linear probing. I.e., if two |
| 34 | + * keys would point to the same internal location, the location for the second |
| 35 | + * inserted key is obtained by linearly searching for the next empty slot. In |
| 36 | + * practice this means that if the load factor reaches over about 50%, |
| 37 | + * performance degradation should be expected. This can be resolved by |
| 38 | + * creating an unordered set with a larger the maximum capacity. Linear |
| 39 | + * probing makes operations O(1) in the best case scenario, and O(2^N) |
| 40 | + * (i.e. the capacity of the unordered set) worst case. |
| 41 | + * |
| 42 | + * Users can optionally specify the sentinel value used to mark a slot as |
| 43 | + * empty by specifying the `EmptySlotValue`. By default the |
| 44 | + * `EmptySlotValue` is 0. It is undefined behavior if a key with the |
| 45 | + * value of the `EmptySlotValue` is inserted. No diagnostic is provided. |
| 46 | + * |
| 47 | + * \warning This class does not synchronize memory, which means while all |
| 48 | + * operations on the container is atomic, it cannot be used to synchronize |
| 49 | + * data not contained in the set across different cores. |
| 50 | + */ |
| 51 | +template <class T, std::uint64_t EmptySlotValue = 0, |
| 52 | + bool ForceCachelineAlignment = true> |
| 53 | +class LockfreeUnorderedSet { |
| 54 | + private: |
| 55 | +#ifdef __cpp_lib_hardware_interference_size |
| 56 | + static constexpr size_t cache_line_size_ = |
| 57 | + std::hardware_destructive_interference_size; |
| 58 | +#else |
| 59 | + static constexpr size_t cache_line_size_ = 64; |
| 60 | +#endif |
| 61 | + |
| 62 | + public: |
| 63 | + static_assert(sizeof(T) <= sizeof(std::uint64_t)); |
| 64 | + |
| 65 | + /*! |
| 66 | + * \brief Create a multi-producer multi-consumer unordered set that allows |
| 67 | + * at most `capacity` objects. |
| 68 | + * |
| 69 | + * \warning \p capacity must be a power of two greater than 0. |
| 70 | + */ |
| 71 | + explicit LockfreeUnorderedSet(size_t capacity); |
| 72 | + // Delete copy and move constructors and assignment operators since this |
| 73 | + // class stores atomic variables needed for thread-safety. |
| 74 | + LockfreeUnorderedSet(const LockfreeUnorderedSet&) = delete; |
| 75 | + LockfreeUnorderedSet& operator=(const LockfreeUnorderedSet&) = delete; |
| 76 | + LockfreeUnorderedSet(LockfreeUnorderedSet&&) = delete; |
| 77 | + LockfreeUnorderedSet& operator=(LockfreeUnorderedSet&&) = delete; |
| 78 | + ~LockfreeUnorderedSet() = default; |
| 79 | + |
| 80 | + /*! |
| 81 | + * \brief Insert the \p key into the set. |
| 82 | + * |
| 83 | + * \param key The key to insert. |
| 84 | + * \param max_linear_probes The maximum number of linear probes to perform |
| 85 | + * before we give up. If `std::nullopt` (the default), then the maximum |
| 86 | + * number of linear probes is the container capacity (`capacity`). |
| 87 | + * This can be used to reduce the worst case linear probing cost from |
| 88 | + * O(capacity) to O(\p max_linear_probes) but users should be certain |
| 89 | + * this will not introduce bugs in their code. |
| 90 | + * \return `true` if the key was inserted or found in the set. Returns |
| 91 | + * `false` if we failed to insert the key because we reached |
| 92 | + * \p max_linear_probes. |
| 93 | + */ |
| 94 | + [[nodiscard]] bool insert( |
| 95 | + T key, std::optional<size_t> max_linear_probes = std::nullopt) noexcept; |
| 96 | + |
| 97 | + /*! |
| 98 | + * \brief Erase the \p key from the set. |
| 99 | + * |
| 100 | + * \param key The key to erase. |
| 101 | + * \param max_linear_probes The maximum number of linear probes to perform |
| 102 | + * before we give up. If `std::nullopt` (the default), then the maximum |
| 103 | + * number of linear probes is the container capacity (`capacity`). |
| 104 | + * This can be used to reduce the worst case linear probing cost from |
| 105 | + * O(capacity) to O(\p max_linear_probes) but users should be certain |
| 106 | + * this will not introduce bugs in their code. |
| 107 | + * \return `true` if the key was erased by this thread. Returns `false` if we |
| 108 | + * failed to erase the key because we reached \p max_linear_probes |
| 109 | + * or because another thread erased the key. We have no general way of |
| 110 | + * knowing why we could not find the \p key to erase. |
| 111 | + */ |
| 112 | + [[nodiscard]] bool erase( |
| 113 | + T key, std::optional<size_t> max_linear_probes = std::nullopt) noexcept; |
| 114 | + |
| 115 | + /*! |
| 116 | + * \brief Check if the unordered set contains the \p key. |
| 117 | + * |
| 118 | + * \param key The key to check if it is contained in the unordered set. |
| 119 | + * \param max_linear_probes The maximum number of linear probes to perform |
| 120 | + * before we give up. If `std::nullopt` (the default), then the maximum |
| 121 | + * number of linear probes is the container capacity (`capacity`). |
| 122 | + * This can be used to reduce the worst case linear probing cost from |
| 123 | + * O(capacity) to O(\p max_linear_probes) but users should be certain |
| 124 | + * this will not introduce bugs in their code. |
| 125 | + * \return `true` if the \p key is found, `false` if not. |
| 126 | + * |
| 127 | + * \warning A \p key may be found in the unordered set and then immediately |
| 128 | + * erased by another thread. |
| 129 | + */ |
| 130 | + [[nodiscard]] bool contains(T key, std::optional<size_t> max_linear_probes = |
| 131 | + std::nullopt) const noexcept; |
| 132 | + |
| 133 | + /// \brief Returns the capacity. |
| 134 | + constexpr size_t capacity() const noexcept { return entries_.size(); } |
| 135 | + |
| 136 | + private: |
| 137 | + [[nodiscard]] constexpr SPECTRE_ALWAYS_INLINE std::uint64_t |
| 138 | + compute_internal_key(const T key) const { |
| 139 | + return static_cast<std::uint64_t>( |
| 140 | + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) |
| 141 | + *reinterpret_cast<const std::conditional_t< |
| 142 | + sizeof(T) == 8, std::uint64_t, |
| 143 | + std::conditional_t<sizeof(T) == 4, std::uint32_t, |
| 144 | + std::conditional_t<sizeof(T) == 2, std::uint16_t, |
| 145 | + std::uint8_t>>>*>(&key)); |
| 146 | + } |
| 147 | + |
| 148 | + // Wrap the retrieval to easily support both cacheline-aligned and unaligned |
| 149 | + // data. We force inline, noexcept and constexpr this function to try and |
| 150 | + // ensure zero runtime overhead. |
| 151 | + [[nodiscard]] constexpr SPECTRE_ALWAYS_INLINE std::atomic<std::uint64_t>& get( |
| 152 | + const std::uint64_t index) noexcept { |
| 153 | + if constexpr (ForceCachelineAlignment) { |
| 154 | + return entries_[index].value; |
| 155 | + } else { |
| 156 | + return entries_[index]; |
| 157 | + } |
| 158 | + } |
| 159 | + [[nodiscard]] constexpr SPECTRE_ALWAYS_INLINE const |
| 160 | + std::atomic<std::uint64_t>& |
| 161 | + get(const std::uint64_t index) const noexcept { |
| 162 | + if constexpr (ForceCachelineAlignment) { |
| 163 | + return entries_[index].value; |
| 164 | + } else { |
| 165 | + return entries_[index]; |
| 166 | + } |
| 167 | + } |
| 168 | + |
| 169 | + // Since atomics may not always be lock free depending on alignment, we want |
| 170 | + // to catch issues where the hardware cannot guarantee that the atomic is |
| 171 | + // lock free. It is not inherently bad that we cannot guarantee that the |
| 172 | + // atomics are lock free at compile time, we could check at runtime, but |
| 173 | + // it's nice to have the guarantee when possible. If for some reason the |
| 174 | + // guarantee isn't given, then likely forcing alignment of the individual |
| 175 | + // atomic variables would restore it. For example, some system may only be |
| 176 | + // able to handle atomics on 32-byte word boundaries. |
| 177 | + static_assert(std::atomic<std::uint64_t>::is_always_lock_free); |
| 178 | + |
| 179 | + struct alignas(cache_line_size_) AlignedEntry { |
| 180 | + std::atomic<std::uint64_t> value{0}; |
| 181 | + }; |
| 182 | + |
| 183 | + alignas(cache_line_size_) |
| 184 | + std::vector<std::conditional_t<ForceCachelineAlignment, AlignedEntry, |
| 185 | + std::atomic<std::uint64_t>>> entries_{}; |
| 186 | + // Ensure we pad the end to avoid false sharing. Unlikely to happen because |
| 187 | + // of the layout, but better to be safe. |
| 188 | + char padding_[cache_line_size_] = {}; // NOLINT(modernize-avoid-c-arrays) |
| 189 | +}; |
| 190 | + |
| 191 | +template <class T, std::uint64_t EmptySlotValue, bool ForceCachelineAlignment> |
| 192 | +LockfreeUnorderedSet<T, EmptySlotValue, ForceCachelineAlignment>:: |
| 193 | + LockfreeUnorderedSet(const size_t capacity) |
| 194 | + : entries_(capacity) { |
| 195 | + if (capacity == 0) { |
| 196 | + ERROR("The capacity must be a power of two larger than 0. Got " |
| 197 | + << capacity); |
| 198 | + } |
| 199 | + if ((capacity bitand (capacity - 1)) != 0) { |
| 200 | + ERROR("The capacity must be a power of two larger than 0. Got " |
| 201 | + << capacity); |
| 202 | + } |
| 203 | + // Explicitly zero out the counters. |
| 204 | + for (auto& t : entries_) { |
| 205 | + if constexpr (ForceCachelineAlignment) { |
| 206 | + t.value.store(EmptySlotValue, std::memory_order_relaxed); |
| 207 | + } else { |
| 208 | + t.store(EmptySlotValue, std::memory_order_relaxed); |
| 209 | + } |
| 210 | + } |
| 211 | +} |
| 212 | + |
| 213 | +template <class T, std::uint64_t EmptySlotValue, bool ForceCachelineAlignment> |
| 214 | +bool LockfreeUnorderedSet<T, EmptySlotValue, ForceCachelineAlignment>::insert( |
| 215 | + const T key, const std::optional<size_t> max_linear_probes) noexcept { |
| 216 | + const size_t counter_end = max_linear_probes.value_or(entries_.size()); |
| 217 | + |
| 218 | + const std::uint64_t internal_key = compute_internal_key(key); |
| 219 | + for (std::uint64_t index = internal_key, counter = 0; counter < counter_end; |
| 220 | + (void)++index, (void)++counter) { // loop for linear probing |
| 221 | + index = index bitand (entries_.size() - 1); |
| 222 | + const std::uint64_t probed_internal_key = |
| 223 | + get(index).load(std::memory_order_relaxed); |
| 224 | + if (probed_internal_key == internal_key) { |
| 225 | + return true; // set already contains value |
| 226 | + } else { |
| 227 | + if (probed_internal_key != EmptySlotValue) { |
| 228 | + // The entry is used by another key. |
| 229 | + continue; |
| 230 | + } |
| 231 | + // The entry is empty. Let's try to set it. |
| 232 | + std::uint64_t current_key_in_slot = EmptySlotValue; |
| 233 | + if (not get(index).compare_exchange_strong( |
| 234 | + current_key_in_slot, internal_key, std::memory_order_relaxed, |
| 235 | + std::memory_order_relaxed) and |
| 236 | + current_key_in_slot != internal_key) { |
| 237 | + // Another thread just stole this slot from us. Try next slot. |
| 238 | + continue; |
| 239 | + } |
| 240 | + // Successful insert. Return. |
| 241 | + return true; |
| 242 | + } |
| 243 | + } |
| 244 | + return false; |
| 245 | +} |
| 246 | + |
| 247 | +template <class T, std::uint64_t EmptySlotValue, bool ForceCachelineAlignment> |
| 248 | +bool LockfreeUnorderedSet<T, EmptySlotValue, ForceCachelineAlignment>::erase( |
| 249 | + const T key, const std::optional<size_t> max_linear_probes) noexcept { |
| 250 | + const size_t counter_end = max_linear_probes.value_or(entries_.size()); |
| 251 | + |
| 252 | + const std::uint64_t internal_key = compute_internal_key(key); |
| 253 | + for (std::uint64_t index = internal_key, counter = 0; counter < counter_end; |
| 254 | + (void)++index, (void)++counter) { // loop for linear probing |
| 255 | + index = index bitand (entries_.size() - 1); |
| 256 | + const std::uint64_t probed_internal_key = |
| 257 | + get(index).load(std::memory_order_relaxed); |
| 258 | + if (probed_internal_key == internal_key) { |
| 259 | + std::uint64_t current_key_in_slot = internal_key; |
| 260 | + if (not get(index).compare_exchange_strong( |
| 261 | + current_key_in_slot, EmptySlotValue, std::memory_order_relaxed, |
| 262 | + std::memory_order_relaxed) and |
| 263 | + current_key_in_slot != EmptySlotValue) { |
| 264 | + // If the CAS failed and the slot value is not the EmptySlotValue, |
| 265 | + // then we failed to erase. E.g. another thread could have erased this |
| 266 | + // value. |
| 267 | + // NOLINTNEXTLINE(readability-simplify-boolean-expr) |
| 268 | + return false; |
| 269 | + } |
| 270 | + return true; |
| 271 | + } |
| 272 | + } |
| 273 | + return false; |
| 274 | +} |
| 275 | + |
| 276 | +template <class T, std::uint64_t EmptySlotValue, bool ForceCachelineAlignment> |
| 277 | +bool LockfreeUnorderedSet<T, EmptySlotValue, ForceCachelineAlignment>::contains( |
| 278 | + const T key, const std::optional<size_t> max_linear_probes) const noexcept { |
| 279 | + const size_t counter_end = max_linear_probes.value_or(entries_.size()); |
| 280 | + |
| 281 | + const std::uint64_t internal_key = compute_internal_key(key); |
| 282 | + for (std::uint64_t index = internal_key, counter = 0; counter < counter_end; |
| 283 | + (void)++index, (void)++counter) { // loop for linear probing |
| 284 | + index = index bitand (entries_.size() - 1); |
| 285 | + const std::uint64_t probed_internal_key = |
| 286 | + get(index).load(std::memory_order_relaxed); |
| 287 | + if (probed_internal_key == internal_key) { |
| 288 | + return true; |
| 289 | + } |
| 290 | + } |
| 291 | + return false; |
| 292 | +} |
| 293 | +} // namespace Parallel |
0 commit comments