1707 lines
63 KiB
Zig
1707 lines
63 KiB
Zig
//! Copyright (c) 2026 Pascal Zittlau, BSD 3-Clause
|
|
//! Version: 0.1.0
|
|
//!
|
|
//! A thread-safe, fixed-capacity, open-addressing Hash Table.
|
|
//!
|
|
//! It uses Robin Hood hashing to minimize the Variance of Probe Sequence Lengths (PSL).
|
|
//! Deletion is handled via Backward-Shift. Unlike tombstone-based deletion, this maintains the
|
|
//! Robin Hood invariant by shifting subsequent entries back to fill gaps, keeping the table compact
|
|
//! and preventing PSL inflation over time.
|
|
//!
|
|
//! Concurrency Model:
|
|
//! - **Writers:** Use Shard-Level Locking. The table is partitioned into contiguous "blocks"
|
|
//! (shards). A writer acquires a mutex for the initial shard and acquires subsequent shard locks
|
|
//! if a probe sequence crosses a boundary. It is possible to deadlock under very specific
|
|
//! circumstances if a writer wraps around while locking. More Information below.
|
|
//! - **Readers:** Use Sequence Locking (Optimistic Concurrency Control). Readers do not take
|
|
//! locks; they read a version counter, perform the probe, and validate the version. This
|
|
//! provides extremely high throughput for read-heavy workloads.
|
|
//!
|
|
//! This implementation is **NOT** wait-free. Writers can starve readers if they constantly update
|
|
//! the same shards.
|
|
//!
|
|
//! Design Constraints:
|
|
//! - POD keys. In particular no slices.
|
|
//! - The table does not resize.
|
|
//! - Always has a power of two number of entries.
|
|
//! - Uses block sharding. Entries are grouped into contiguous chunks protected by the same lock to
|
|
//! optimize cache locality during linear probing.
|
|
//! - Always uses a power of two number of shards.
|
|
//!
|
|
//! ## Pointers and Memory Reclamation
|
|
//! This table does not provide a memory reclamation scheme.
|
|
//! - If `K` is a pointer, `eql` should very likely **not** dereference it. If it does, a concurrent
|
|
//! `remove` + `free` by a writer will cause the reader to segfault during the equality check.
|
|
//! - If `V` is a pointer, the data it points to is not guaranteed to be valid after `get()`
|
|
//! returns, as another thread may have removed and freed it.
|
|
//!
|
|
//! ## Choosing the Number of Shards
|
|
//!
|
|
//! The `num_shards` parameter balances throughput, memory overhead, and deadlock risk. Here are a
|
|
//! few considerations for choosing an appropriate shard count:
|
|
//! - Higher shard counts reduce lock contention among writers.
|
|
//! - Higher shard counts reduce retry probabilities for readers.
|
|
//! - Each shard is aligned to a cache line(64-128 bytes) to prevent false sharing. 1024 shards
|
|
//! consume ~64-128KB which increases cache pressure.
|
|
//! - This implementation uses a fixed-size buffer of size `fetch_version_array_size = 8` to track
|
|
//! shard versions during optimistic reads. If a probe sequence spans more shards than this
|
|
//! buffer, `get()` will return `ProbeLimitExceeded`. Smaller shards make this more likely.
|
|
//! - Smaller shards increase the number of locks that may be required for writers. This is
|
|
//! additional overhead.
|
|
//! - If more threads are used than shards are available the deadlock risk is increased.
|
|
//! - Deadlocks only occur during array wrap-around. More information below.
|
|
//!
|
|
//! Usually **64-1024** shards with 4-16 shards per thread are the "sweet spot".
|
|
//! `const num_shards = @min(@max(64, num_threads * 16), 1024);`
|
|
//!
|
|
//! ## Deadlock safety
|
|
//!
|
|
//! A deadlock can only occur if a circular dependency is created between shards. Because writers
|
|
//! always acquire locks in increasing index order, a deadlock is only possible during a
|
|
//! wrap-around, where a probe starts at the end of the `entries` array and continues at index 0.
|
|
//!
|
|
//! Why a deadlock is extremely unlikely:
|
|
//! - Robin Hood hashing keeps PSLs very short. Even at 90% load, the average PSL is ~4.5. In
|
|
//! particular the expected value for the length of a PSL grows with O(ln(n)) where `n` is the
|
|
//! capacity of the table.
|
|
//! - For a deadlock to occur, one thread must be probing across the wrap-around boundary
|
|
//! (Shard[N-1] -> Shard[0]) while the other threads simultaneously bridge it from the other size.
|
|
//! - Given that shards typically contain hundreds or thousands of slots and there are usually
|
|
//! dozens or hundreds of shards, the probability of multiple simultaneous probe sequences being
|
|
//! long enough to bridge the whole table is practically zero for well-sized tables and shards.
|
|
//!
|
|
//! The following experimental results show PSLs for various `load_factors` for a table with a
|
|
//! capacity of 2**21(2 million entries).
|
|
//!
|
|
//! load_factor | max PSL | avg PSL | median PSL
|
|
//! 0.50 | 12 | 0.50 | 0
|
|
//! 0.70 | 23 | 1.17 | 1
|
|
//! 0.80 | 31 | 2.01 | 1
|
|
//! 0.90 | 65 | 4.56 | 3
|
|
//! 0.95 | 131 | 9.72 | 6
|
|
//! 0.99 | 371 | 78.72 | 34
|
|
//!
|
|
//! ## Benchmarks
|
|
//!
|
|
//! You can run the included benchmark suite yourself to evaluate throughput and scaling on your own
|
|
//! hardware. Simply run the `main` function of this file (e.g., using `zig run <filename>.zig -O
|
|
//! ReleaseFast`). The benchmark compares this concurrent implementation against a standard
|
|
//! `std.AutoHashMap` wrapped in a `std.Thread.Mutex` across varying read/write/delete workloads.
|
|
//!
|
|
//! ## License
|
|
//!
|
|
//! Copyright 2026 Pascal Zittlau
|
|
//!
|
|
//! Redistribution and use in source and binary forms, with or without modification, are permitted
|
|
//! provided that the following conditions are met:
|
|
//! 1. Redistributions of source code must retain the above copyright notice, this list of
|
|
//! conditions and the following disclaimer.
|
|
//! 2. Redistributions in binary form must reproduce the above copyright notice, this list of
|
|
//! conditions and the following disclaimer in the documentation and/or other materials provided
|
|
//! with the distribution.
|
|
//
|
|
//! 3. Neither the name of the copyright holder nor the names of its contributors may be used to
|
|
//! endorse or promote products derived from this software without specific prior written
|
|
//! permission.
|
|
//
|
|
//! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR
|
|
//! IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
|
//! FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
|
|
//! CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
//! CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
//! SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
//! THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
|
|
//! OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
//! POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
const std = @import("std");
|
|
const builtin = @import("builtin");
|
|
|
|
const atomic = std.atomic;
|
|
const math = std.math;
|
|
const mem = std.mem;
|
|
const testing = std.testing;
|
|
|
|
const assert = std.debug.assert;
|
|
|
|
const cache_line_size = atomic.cacheLineForCpu(builtin.cpu);
|
|
|
|
const Allocator = mem.Allocator;
|
|
|
|
pub fn getAutoHashFn(comptime K: type) (fn (K) u64) {
|
|
return struct {
|
|
fn hash(key: K) u64 {
|
|
if (std.meta.hasUniqueRepresentation(K)) {
|
|
return std.hash.Wyhash.hash(0, std.mem.asBytes(&key));
|
|
} else {
|
|
var hasher = std.hash.Wyhash.init(0);
|
|
std.hash.autoHash(&hasher, key);
|
|
return hasher.final();
|
|
}
|
|
}
|
|
}.hash;
|
|
}
|
|
|
|
pub fn getAutoEqlFn(comptime K: type) (fn (K, K) bool) {
|
|
return struct {
|
|
fn eql(a: K, b: K) bool {
|
|
return std.meta.eql(a, b);
|
|
}
|
|
}.eql;
|
|
}
|
|
|
|
pub fn AutoHashMapConcurrent(comptime K: type, comptime V: type) type {
|
|
if (!isSafe(K)) {
|
|
@compileError("AutoHashMapConcurrent: Key type '" ++ @typeName(K) ++
|
|
"' is potentially tearable (e.g. contains slices or is larger than a machine word). " ++
|
|
"Optimistic readers do not take locks and might see a partially updated key. " ++
|
|
"This causes automatically generated 'hash' and 'eql' functions to produce garbage " ++
|
|
"results or trigger memory safety violations (segfaults on torn slices). " ++
|
|
"To use this safely, you must provide your own robust 'hash' and 'eql' " ++
|
|
"functions and use 'HashMapConcurrent' directly.");
|
|
}
|
|
const hash = getAutoHashFn(K);
|
|
const eql = getAutoEqlFn(K);
|
|
return HashMapConcurrent(K, V, hash, eql, .{});
|
|
}
|
|
|
|
// By default mark all composite types as unsafe and assume that values smaller than `usize` are
|
|
// never torn.
|
|
fn isSafe(comptime T: type) bool {
|
|
switch (@typeInfo(T)) {
|
|
.bool => return true,
|
|
.int => |i| return i.bits <= @bitSizeOf(usize),
|
|
.float => |f| return f.bits <= @bitSizeOf(usize),
|
|
.@"enum" => |e| return @sizeOf(e.tag_type) <= @sizeOf(usize),
|
|
.pointer => |p| return p.size != .slice,
|
|
// TODO: add more safe ones as needed
|
|
else => return false,
|
|
}
|
|
}
|
|
|
|
pub const Config = struct {
|
|
/// The maximum load factor before the table is considered "at capacity". Default: 0.8
|
|
/// This is not really a strict maximum. The table still continues to perform okay up to about
|
|
/// to 95% fill. At 99% the performance degrades sharply. Even higher and the risk of deadlocks
|
|
/// increases. See the module documentation for more information or run a benchmark.
|
|
max_fill_factor: f64 = 0.8,
|
|
/// When fetching we need to save versions of the shards we visit. This is the maximum
|
|
/// number of shards we traverse in search for the entry. Visiting more shards than this
|
|
/// could happen under the following circumstances:
|
|
/// - the hash function is bad or
|
|
/// - the shards are small or
|
|
/// - the fill factor is close to 1
|
|
fetch_version_array_size: u64 = 8,
|
|
};
|
|
|
|
pub fn HashMapConcurrent(
|
|
/// The key has to be carefully chosen such that the `hash` and `eql` functions work correctly.
|
|
/// In particular the combination shouldn't be susceptible to torn reads.
|
|
K: type,
|
|
V: type,
|
|
hash: fn (K) u64,
|
|
eql: fn (K, K) bool,
|
|
config: Config,
|
|
) type {
|
|
return struct {
|
|
const Self = @This();
|
|
|
|
pub const Entry = struct {
|
|
key: K,
|
|
value: V,
|
|
};
|
|
|
|
const EMPTY_DIST = 0;
|
|
const SATURATED_DIST = 255;
|
|
|
|
/// How many entries can be saved in the table at most.
|
|
capacity: usize,
|
|
/// Number of used entries.
|
|
// This is heap allocated with a complete cache line to remove false sharing when other
|
|
// threads just try to access members of the HashTable struct.
|
|
count: *align(cache_line_size) atomic.Value(usize),
|
|
|
|
// NOTE: No slices because we don't want the bounds checks. All accesses are protected by
|
|
// masking of the excess bits with the masks below.
|
|
|
|
/// The entries saved in the table.
|
|
/// `entries.len == capacity`
|
|
entries: [*]align(cache_line_size) Entry,
|
|
/// The distances of the entries. `dists[i]` refers to the distance of `entries[i]`.
|
|
/// `dists.len == entries.len == capacity`
|
|
dists: [*]align(cache_line_size) u8,
|
|
/// Locks for the shards. `locks.len == num_shards`
|
|
locks: [*]align(cache_line_size) ShardLock,
|
|
|
|
num_shards: usize,
|
|
|
|
/// Maximum capacity before the table signals it is full in `atCapacity()`.
|
|
/// Pre-calculated to avoid math in hot paths.
|
|
max_count_threshold: usize,
|
|
|
|
entry_mask: usize,
|
|
shard_mask: usize,
|
|
/// shift **right** to get shard index.
|
|
shard_shift: u6,
|
|
/// Used to efficiently detect shard boundaries during linear probing.
|
|
/// `(entries_per_shard - 1)`.
|
|
partition_mask: usize,
|
|
|
|
/// Initializes the Hash Table.
|
|
///
|
|
/// `capacity`: The number of elements the table should hold. Must be a power of two.
|
|
/// If you just have a minimum capacity that should fit, use
|
|
/// `capacityForMin` to get value that respects the fill factor and is a power
|
|
/// of two.
|
|
///
|
|
/// `num_shards`: Number of mutexes. Must be a power of two and `<= capacity`.
|
|
/// See the module documentation for guidance on selecting this value.
|
|
///
|
|
/// bytes_used ≈
|
|
/// @sizeOf(ConcurrentHashTable) // structure
|
|
/// + capacity * @sizeOf(K) // keys
|
|
/// + capacity * @sizeOf(V) // values
|
|
/// + capacity // distances
|
|
/// + num_shards * cache_line_size // locks
|
|
/// + cache_line_size // count
|
|
pub fn init(allocator: Allocator, capacity: usize, num_shards: usize) !Self {
|
|
assert(math.isPowerOfTwo(capacity));
|
|
assert(math.isPowerOfTwo(num_shards));
|
|
assert(num_shards > 0);
|
|
assert(capacity >= num_shards);
|
|
|
|
const entries = try allocator.alignedAlloc(
|
|
Entry,
|
|
.fromByteUnits(cache_line_size),
|
|
capacity,
|
|
);
|
|
errdefer allocator.free(entries);
|
|
|
|
const dists = try allocator.alignedAlloc(
|
|
u8,
|
|
.fromByteUnits(cache_line_size),
|
|
capacity,
|
|
);
|
|
errdefer allocator.free(dists);
|
|
@memset(dists, EMPTY_DIST);
|
|
|
|
// Enforce that `count` has a complete cache line for itself to remove false sharing.
|
|
const count_line = try allocator.alignedAlloc(
|
|
u8,
|
|
.fromByteUnits(cache_line_size),
|
|
cache_line_size,
|
|
);
|
|
errdefer allocator.free(count_line);
|
|
const count = mem.bytesAsValue(atomic.Value(usize), count_line);
|
|
count.*.raw = 0;
|
|
|
|
const shards = try allocator.alignedAlloc(
|
|
ShardLock,
|
|
.fromByteUnits(cache_line_size),
|
|
num_shards,
|
|
);
|
|
errdefer allocator.free(shards);
|
|
@memset(shards, .{});
|
|
|
|
const entries_per_shard = capacity / num_shards;
|
|
assert(math.isPowerOfTwo(entries_per_shard));
|
|
const shift = math.log2_int(usize, entries_per_shard);
|
|
|
|
const capacity_f: f64 = @floatFromInt(capacity);
|
|
return .{
|
|
.entries = entries.ptr,
|
|
.dists = dists.ptr,
|
|
.locks = shards.ptr,
|
|
.capacity = capacity,
|
|
.num_shards = num_shards,
|
|
.count = count,
|
|
.entry_mask = capacity - 1,
|
|
.shard_mask = num_shards - 1,
|
|
.shard_shift = @intCast(shift),
|
|
.partition_mask = entries_per_shard - 1,
|
|
.max_count_threshold = @intFromFloat(capacity_f * config.max_fill_factor),
|
|
};
|
|
}
|
|
|
|
/// Calculates the required power-of-two capacity to hold `min_capacity` entries while
|
|
/// staying under the `max_fill_factor`.
|
|
pub fn capacityForMin(min_capacity: usize) usize {
|
|
const min_capacity_f: f64 = @floatFromInt(min_capacity);
|
|
const adj_min_capacity: u64 = @intFromFloat(min_capacity_f / config.max_fill_factor);
|
|
return math.ceilPowerOfTwoAssert(u64, adj_min_capacity);
|
|
}
|
|
|
|
/// **Not** threadsafe
|
|
pub fn deinit(ht: *Self, allocator: Allocator) void {
|
|
allocator.free(ht.entries[0..ht.capacity]);
|
|
allocator.free(ht.dists[0..ht.capacity]);
|
|
allocator.free(ht.locks[0..ht.num_shards]);
|
|
|
|
// Rebuild the used cache-line
|
|
var count_line_ptr: [*]align(cache_line_size) u8 = @ptrCast(ht.count);
|
|
allocator.free(count_line_ptr[0..cache_line_size]);
|
|
|
|
ht.* = undefined;
|
|
}
|
|
|
|
/// Thread-safely removes all entries from the table.
|
|
/// Acquires all shard locks to ensure consistency.
|
|
pub fn clearRetainingCapacity(ht: *Self) void {
|
|
for (0..ht.num_shards) |i| ht.locks[i].lock();
|
|
defer {
|
|
var i: usize = 0;
|
|
while (i < ht.num_shards) : (i += 1) ht.locks[i].unlock();
|
|
}
|
|
|
|
@memset(ht.dists[0..ht.capacity], EMPTY_DIST);
|
|
ht.count.store(0, .release);
|
|
}
|
|
|
|
/// Returns true if the current number of entries has reached the pre-calculated threshold
|
|
/// based on `max_fill_factor`.
|
|
pub fn atCapacity(ht: *Self) bool {
|
|
return ht.count.load(.monotonic) > ht.max_count_threshold;
|
|
}
|
|
|
|
/// Returns the current load factor of the table (0.0 to 1.0).
|
|
pub fn loadFactor(ht: *Self) f64 {
|
|
const count: f64 = @floatFromInt(ht.count.load(.monotonic));
|
|
const capacity: f64 = @floatFromInt(ht.entries.len);
|
|
return count / capacity;
|
|
}
|
|
|
|
inline fn getShardIndex(ht: *const Self, entry_idx: usize) usize {
|
|
return (entry_idx >> ht.shard_shift) & ht.shard_mask;
|
|
}
|
|
|
|
/// If the key exists, it **is overwritten**.
|
|
///
|
|
/// This function asserts that the table has enough physical capacity to perform the
|
|
/// insertion. Use `atCapacity()` to check load factors, or `tryPut()` if you need
|
|
/// to handle absolute capacity limits gracefully.
|
|
pub fn put(ht: *Self, key: K, value: V) void {
|
|
_ = ht.fetchPut(key, value);
|
|
}
|
|
|
|
/// Inserts or updates an entry. If the key already exists, the value is updated.
|
|
/// Returns the previous value if the key was present, otherwise null.
|
|
///
|
|
/// This function asserts that the table has enough physical capacity.
|
|
pub fn fetchPut(ht: *Self, key: K, value: V) ?V {
|
|
return ht.tryFetchPut(key, value) catch unreachable;
|
|
}
|
|
|
|
pub const TryPutError = error{TableFull};
|
|
|
|
/// Attempts to insert or update a key-value pair.
|
|
///
|
|
/// If the table is physically full (the probe sequence wraps around), it returns
|
|
/// `TryPutError.TableFull`.
|
|
pub fn tryPut(ht: *Self, key: K, value: V) TryPutError!void {
|
|
_ = try ht.tryFetchPut(key, value);
|
|
}
|
|
|
|
/// Attempts to insert or update an entry, returning the previous value if it existed.
|
|
///
|
|
/// This is the primary insertion logic for the table. It handles:
|
|
/// 1. Shard-level locking (acquiring additional locks if the probe crosses boundaries).
|
|
/// 2. Robin Hood swaps to minimize probe sequence length.
|
|
/// 3. Wrap-around detection to prevent infinite loops in a 100% full table.
|
|
///
|
|
/// Returns:
|
|
/// - `V`: The previous value if the key was already present.
|
|
/// - `null`: If the key was newly inserted.
|
|
/// - `TryPutError.TableFull`: If no space could be found.
|
|
pub fn tryFetchPut(ht: *Self, key: K, value: V) TryPutError!?V {
|
|
const incoming = Entry{ .key = key, .value = value };
|
|
var current = incoming;
|
|
var idx = hash(current.key) & ht.entry_mask;
|
|
// Distance from the ideal position for the `current` entry we are trying to insert.
|
|
var dist: u64 = 0;
|
|
|
|
const start_lock = ht.getShardIndex(idx);
|
|
var end_lock = start_lock;
|
|
ht.locks[start_lock].lock();
|
|
|
|
// Unlock all touched shards
|
|
defer {
|
|
var i = start_lock;
|
|
while (true) {
|
|
ht.locks[i].unlock();
|
|
if (i == end_lock) break;
|
|
i = (i + 1) & ht.shard_mask;
|
|
}
|
|
}
|
|
|
|
while (true) {
|
|
const stored_dist = ht.dists[idx];
|
|
|
|
if (stored_dist == EMPTY_DIST) {
|
|
ht.entries[idx] = current;
|
|
ht.dists[idx] = @intCast(@min(dist + 1, SATURATED_DIST));
|
|
_ = ht.count.fetchAdd(1, .monotonic);
|
|
return null;
|
|
} else if (eql(ht.entries[idx].key, current.key)) {
|
|
assert(eql(current.key, incoming.key));
|
|
const old_val = ht.entries[idx].value;
|
|
ht.entries[idx].value = current.value;
|
|
return old_val;
|
|
}
|
|
|
|
// Collision, so apply Robin Hood logic.
|
|
var existing_dist: u64 = stored_dist - 1;
|
|
if (stored_dist == SATURATED_DIST) {
|
|
@branchHint(.cold);
|
|
// If saturated, we must recompute the hash to find the real distance.
|
|
const existing_ideal = hash(ht.entries[idx].key) & ht.entry_mask;
|
|
existing_dist = (idx -% existing_ideal) & ht.entry_mask;
|
|
}
|
|
|
|
if (dist > existing_dist) {
|
|
mem.swap(Entry, ¤t, &ht.entries[idx]);
|
|
ht.dists[idx] = @intCast(@min(dist + 1, SATURATED_DIST));
|
|
dist = existing_dist;
|
|
}
|
|
|
|
idx = (idx + 1) & ht.entry_mask;
|
|
dist += 1;
|
|
|
|
if ((idx & ht.partition_mask) == 0) {
|
|
@branchHint(.unlikely);
|
|
// Since we move linearly, the new shard is simply the next one.
|
|
const next_lock = (end_lock + 1) & ht.shard_mask;
|
|
if (next_lock == start_lock) return TryPutError.TableFull;
|
|
|
|
ht.locks[next_lock].lock();
|
|
end_lock = next_lock;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fetches a value from the table.
|
|
///
|
|
/// This asserts that the probe sequence does not span more shards than the internal
|
|
/// tracking buffer allows. For properly sized tables and shards, this is the standard
|
|
/// lookup method.
|
|
pub fn get(ht: *const Self, key: K) ?V {
|
|
return ht.getChecked(key) catch unreachable;
|
|
}
|
|
|
|
pub const GetError = error{ProbeLimitExceeded};
|
|
|
|
/// Performs an optimistic concurrent lookup.
|
|
///
|
|
/// Use this if you are operating at extreme load factors (>99%), with a small capacity, or
|
|
/// with a hash function that might produce very long clusters.
|
|
///
|
|
/// Returns:
|
|
/// - `null` if the key is not found.
|
|
/// - `GetError.ProbeLimitExceeded` if the probe sequence spans more than
|
|
/// `fetch_version_array_size` shards. This can happen with very poor hash distributions
|
|
/// or extremely high load factors. Use `getBuffer` to provide a larger stack-allocated
|
|
/// buffer if this occurs.
|
|
pub fn getChecked(ht: *const Self, key: K) GetError!?V {
|
|
var versions: [config.fetch_version_array_size]u64 = undefined;
|
|
return ht.getBuffer(key, &versions);
|
|
}
|
|
|
|
/// Low-level lookup allowing the caller to provide a version buffer.
|
|
///
|
|
/// Use this with a bigger buffer if `get()` returns `ProbeLimitExceeded` due to high shard
|
|
/// density.
|
|
///
|
|
/// This does not acquire any mutexes. It reads a version counter before and after reading
|
|
/// the data. If the version changed (meaning a writer touched the shard), the read is
|
|
/// retried.
|
|
pub fn getBuffer(ht: *const Self, key: K, versions: []u64) GetError!?V {
|
|
assert(versions.len > 0);
|
|
|
|
var shard_count: usize = 0;
|
|
var first_shard_idx: usize = undefined;
|
|
|
|
retry: while (true) {
|
|
var idx = hash(key) & ht.entry_mask;
|
|
var dist: u64 = 0;
|
|
|
|
var current_shard = ht.getShardIndex(idx);
|
|
first_shard_idx = current_shard;
|
|
versions[0] = ht.locks[current_shard].readBegin();
|
|
shard_count = 1;
|
|
|
|
const val: ?V = while (true) {
|
|
const stored_dist = ht.dists[idx];
|
|
if (stored_dist == EMPTY_DIST) break null;
|
|
// Robin Hood invariant check
|
|
if (stored_dist < (dist + 1) and stored_dist != SATURATED_DIST) break null;
|
|
|
|
const slot = &ht.entries[idx];
|
|
const k = slot.key;
|
|
const v = slot.value;
|
|
|
|
if (eql(k, key)) break v;
|
|
|
|
idx = (idx + 1) & ht.entry_mask;
|
|
dist += 1;
|
|
|
|
if ((idx & ht.partition_mask) == 0) {
|
|
@branchHint(.unlikely);
|
|
if (shard_count == versions.len) return GetError.ProbeLimitExceeded;
|
|
current_shard = (current_shard + 1) & ht.shard_mask;
|
|
versions[shard_count] = ht.locks[current_shard].readBegin();
|
|
shard_count += 1;
|
|
}
|
|
};
|
|
|
|
// We need a memory barrier here to ensure the data reads from the entries aren't
|
|
// reordered after the sequence version validations below.
|
|
loadFence();
|
|
|
|
// Validate all traversed shards
|
|
var check_idx = first_shard_idx;
|
|
for (0..shard_count) |i| {
|
|
if (!ht.locks[check_idx].readValid(versions[i])) {
|
|
@branchHint(.unlikely);
|
|
atomic.spinLoopHint();
|
|
continue :retry;
|
|
}
|
|
check_idx = (check_idx + 1) & ht.shard_mask;
|
|
}
|
|
return val;
|
|
}
|
|
}
|
|
|
|
/// Removes an entry from the table and performs a backward-shift to fill the gap.
|
|
pub fn remove(ht: *Self, key: K) void {
|
|
_ = ht.fetchRemove(key);
|
|
}
|
|
|
|
/// Removes an entry from the table and performs a backward-shift to fill the gap.
|
|
/// Returns the value of the removed entry, or null if the key was not found.
|
|
pub fn fetchRemove(ht: *Self, key: K) ?V {
|
|
var idx = hash(key) & ht.entry_mask;
|
|
var dist: u64 = 0;
|
|
|
|
const start_lock = ht.getShardIndex(idx);
|
|
var end_lock = start_lock;
|
|
ht.locks[start_lock].lock();
|
|
|
|
// Release all shards locked during probe and shift.
|
|
defer {
|
|
var i = start_lock;
|
|
while (true) {
|
|
ht.locks[i].unlock();
|
|
if (i == end_lock) break;
|
|
i = (i + 1) & ht.shard_mask;
|
|
}
|
|
}
|
|
|
|
// Find slot
|
|
while (true) {
|
|
const stored_dist = ht.dists[idx];
|
|
|
|
if (stored_dist == EMPTY_DIST) return null;
|
|
// Robin Hood invariant check
|
|
if (stored_dist < (dist + 1) and stored_dist != SATURATED_DIST) return null;
|
|
if (eql(ht.entries[idx].key, key)) break;
|
|
|
|
idx = (idx + 1) & ht.entry_mask;
|
|
dist += 1;
|
|
|
|
if ((idx & ht.partition_mask) == 0) {
|
|
@branchHint(.unlikely);
|
|
const next_lock = (end_lock + 1) & ht.shard_mask;
|
|
if (next_lock == start_lock) return null; // Wrap-around safety
|
|
|
|
ht.locks[next_lock].lock();
|
|
end_lock = next_lock;
|
|
}
|
|
}
|
|
const removed_value = ht.entries[idx].value;
|
|
_ = ht.count.fetchSub(1, .monotonic);
|
|
|
|
// Backward Shift Deletion
|
|
while (true) {
|
|
// Mark current slot as empty (temporarily, will be filled if shifting)
|
|
ht.dists[idx] = EMPTY_DIST;
|
|
|
|
const next_idx = (idx + 1) & ht.entry_mask;
|
|
|
|
if ((next_idx & ht.partition_mask) == 0) {
|
|
@branchHint(.unlikely);
|
|
const next_lock = (end_lock + 1) & ht.shard_mask;
|
|
if (next_lock != start_lock) {
|
|
@branchHint(.unlikely);
|
|
ht.locks[next_lock].lock();
|
|
end_lock = next_lock;
|
|
}
|
|
}
|
|
|
|
const next_dist_stored = ht.dists[next_idx];
|
|
// If the next element is empty (0) the item is at its ideal position, we can't
|
|
// shift it back.
|
|
if (next_dist_stored == EMPTY_DIST or next_dist_stored == 1) {
|
|
return removed_value;
|
|
}
|
|
|
|
// Shift back into the hole
|
|
ht.entries[idx] = ht.entries[next_idx];
|
|
var new_dist: u64 = next_dist_stored - 1;
|
|
if (next_dist_stored == SATURATED_DIST) {
|
|
@branchHint(.cold);
|
|
// Recompute real distance
|
|
const ideal = hash(ht.entries[next_idx].key) & ht.entry_mask;
|
|
const real_dist = (next_idx -% ideal) & ht.entry_mask;
|
|
new_dist = real_dist;
|
|
}
|
|
|
|
ht.dists[idx] = @intCast(@min(new_dist, SATURATED_DIST));
|
|
idx = next_idx;
|
|
}
|
|
}
|
|
|
|
/// Removes all entries from the table using lock-crabbing. This *is thread-safe* but
|
|
/// depending on the usage the table might never be fully empty.
|
|
///
|
|
/// It locks shards sequentially (holding at most two at a time) to avoid stalling the
|
|
/// entire table while preventing elements from shifting across boundaries. Concurrent
|
|
/// writers are still allowed, though depending on where the cleaning logic is, their entry
|
|
/// might be overwritten shortly after.
|
|
pub fn clear(ht: *Self) void {
|
|
ht.locks[0].lock();
|
|
var current_shard: usize = 0;
|
|
var elements_cleared: usize = 0;
|
|
|
|
for (0..ht.capacity) |i| {
|
|
// Check if we crossed a shard boundary
|
|
if (i > 0 and (i & ht.partition_mask) == 0) {
|
|
if (elements_cleared > 0) {
|
|
_ = ht.count.fetchSub(elements_cleared, .monotonic);
|
|
elements_cleared = 0;
|
|
}
|
|
|
|
const next_shard = current_shard + 1;
|
|
if (next_shard < ht.num_shards) {
|
|
ht.locks[next_shard].lock();
|
|
}
|
|
ht.locks[current_shard].unlock();
|
|
current_shard = next_shard;
|
|
}
|
|
|
|
if (ht.dists[i] != EMPTY_DIST) {
|
|
ht.dists[i] = EMPTY_DIST;
|
|
elements_cleared += 1;
|
|
}
|
|
}
|
|
|
|
// Flush remaining cleared elements and release the last lock
|
|
if (elements_cleared > 0) {
|
|
_ = ht.count.fetchSub(elements_cleared, .monotonic);
|
|
}
|
|
ht.locks[current_shard].unlock();
|
|
}
|
|
|
|
pub const LockingIterator = struct {
|
|
ht: *Self,
|
|
current_index: usize,
|
|
current_shard: usize,
|
|
|
|
/// Releases any shard locks held by the iterator.
|
|
/// This MUST be called if you stop iterating before `next()` returns `null`.
|
|
/// It is safe to call this even if the iterator is fully exhausted.
|
|
pub fn deinit(self: *LockingIterator) void {
|
|
if (self.current_shard < self.ht.num_shards) {
|
|
self.ht.locks[self.current_shard].unlock();
|
|
self.current_shard = self.ht.num_shards; // Prevent double-unlock
|
|
}
|
|
}
|
|
|
|
pub fn next(self: *LockingIterator) ?Entry {
|
|
while (self.current_index < self.ht.capacity) {
|
|
// Check if we crossed a shard boundary
|
|
if (self.current_index > 0 and (self.current_index & self.ht.partition_mask) == 0) {
|
|
const next_shard = self.current_shard + 1;
|
|
if (next_shard < self.ht.num_shards) {
|
|
self.ht.locks[next_shard].lock();
|
|
}
|
|
self.ht.locks[self.current_shard].unlock();
|
|
self.current_shard = next_shard;
|
|
}
|
|
|
|
const dist = self.ht.dists[self.current_index];
|
|
const entry = self.ht.entries[self.current_index];
|
|
self.current_index += 1;
|
|
|
|
if (dist != EMPTY_DIST) {
|
|
return entry;
|
|
}
|
|
}
|
|
|
|
// Reached the end, release the last lock
|
|
self.deinit();
|
|
return null;
|
|
}
|
|
};
|
|
|
|
/// Returns a thread-safe iterator using lock coupling. This will **not** provide a
|
|
/// consistent state because concurrent writers are still allowed. Though no elements will
|
|
/// be returned twice.
|
|
///
|
|
/// You MUST either exhaust the iterator (until `next()` returns `null`) or explicitly call
|
|
/// `it.deinit()`(or both) if you break early. Otherwise, a lock will be leaked and the
|
|
/// table will deadlock.
|
|
///
|
|
/// Do not call a locking function on the same table while an iterator is active in the same
|
|
/// thread.
|
|
pub fn lockingIterator(ht: *Self) LockingIterator {
|
|
ht.locks[0].lock();
|
|
return .{
|
|
.ht = ht,
|
|
.current_index = 0,
|
|
.current_shard = 0,
|
|
};
|
|
}
|
|
|
|
pub const ApproximateIterator = struct {
|
|
ht: *const Self,
|
|
current_index: usize,
|
|
|
|
pub fn next(self: *ApproximateIterator) ?Entry {
|
|
while (self.current_index < self.ht.capacity) {
|
|
defer self.current_index += 1;
|
|
|
|
const shard_idx = self.ht.getShardIndex(self.current_index);
|
|
var dist: u8 = undefined;
|
|
var entry: Entry = undefined;
|
|
|
|
// Optimistic read loop for this specific slot
|
|
while (true) {
|
|
const version = self.ht.locks[shard_idx].readBegin();
|
|
dist = self.ht.dists[self.current_index];
|
|
|
|
// Only copy the entry if the slot isn't empty
|
|
if (dist != EMPTY_DIST) {
|
|
entry = self.ht.entries[self.current_index];
|
|
}
|
|
|
|
loadFence();
|
|
|
|
if (self.ht.locks[shard_idx].readValid(version)) {
|
|
break; // Successfully read a consistent state
|
|
}
|
|
atomic.spinLoopHint();
|
|
}
|
|
|
|
if (dist != EMPTY_DIST) {
|
|
return entry;
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
};
|
|
|
|
/// Returns a non-locking, approximate iterator using optimistic concurrency control.
|
|
///
|
|
/// Because this iterator does not hold locks, concurrent `put` and `remove` operations can
|
|
/// shift elements backwards or forwards. As a result, this iterator may miss entries or
|
|
/// return the same entry multiple times. However, it strictly guarantees that any returned
|
|
/// `Entry` is internally consistent (no torn reads).
|
|
///
|
|
/// It is perfectly safe to break out of this iterator early, as no locks are held.
|
|
pub fn approximateIterator(ht: *const Self) ApproximateIterator {
|
|
return .{
|
|
.ht = ht,
|
|
.current_index = 0,
|
|
};
|
|
}
|
|
|
|
/// Collects health statistics about the table.
|
|
/// This is a slow, non-atomic operation. Use only for monitoring or debugging.
|
|
pub fn collectStatistics(ht: *const Self) Statistics {
|
|
var total_psl: usize = 0;
|
|
var max_psl: usize = 0;
|
|
const count: usize = ht.count.load(.acquire);
|
|
|
|
// Histogram for median calculation.
|
|
const hist_size = 1024; // If more are needed you likely have other worse problems
|
|
var psl_histogram = [_]usize{0} ** (hist_size);
|
|
var actual_count: usize = 0;
|
|
|
|
for (0..ht.capacity) |i| {
|
|
const slot = ht.entries[i];
|
|
const k = slot.key;
|
|
if (ht.dists[i] != EMPTY_DIST) {
|
|
const ideal = hash(k) & ht.entry_mask;
|
|
const psl = (i -% ideal) & ht.entry_mask;
|
|
|
|
total_psl += psl;
|
|
actual_count += 1;
|
|
if (psl > max_psl) max_psl = psl;
|
|
|
|
const bucket = @min(psl, psl_histogram.len - 1);
|
|
psl_histogram[bucket] += 1;
|
|
}
|
|
}
|
|
|
|
// Calculate Median from Histogram
|
|
var median_psl: usize = 0;
|
|
if (actual_count > 0) {
|
|
const target = actual_count / 2;
|
|
var accumulated: usize = 0;
|
|
for (psl_histogram, 0..) |freq, psl_val| {
|
|
accumulated += freq;
|
|
if (accumulated >= target) {
|
|
median_psl = psl_val;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
const count_f: f64 = @floatFromInt(count);
|
|
|
|
return .{
|
|
.capacity = ht.capacity,
|
|
.count = count,
|
|
.load_factor = count_f / @as(f64, @floatFromInt(ht.capacity)),
|
|
.max_psl = max_psl,
|
|
.avg_psl = if (count > 0) @as(f64, @floatFromInt(total_psl)) / count_f else 0,
|
|
.median_psl = median_psl,
|
|
.num_shards = ht.num_shards,
|
|
};
|
|
}
|
|
|
|
/// Exhaustively validates the internal state of the table.
|
|
/// This is slow, not threadsafe, and should only really be used in tests.
|
|
pub fn verifyIntegrity(ht: *const Self) !void {
|
|
assert(builtin.is_test);
|
|
var actual_total_count: usize = 0;
|
|
var count: usize = 0;
|
|
|
|
for (0..ht.capacity) |i| {
|
|
const entry = ht.entries[i];
|
|
if (ht.dists[i] == EMPTY_DIST) continue;
|
|
|
|
actual_total_count += 1;
|
|
count += 1;
|
|
|
|
// Ensure the key can actually be found by the get() logic
|
|
const found_val = try ht.getChecked(entry.key);
|
|
try testing.expectEqual(entry.value, found_val);
|
|
|
|
// Validate Robin Hood Invariant. A slot's PSL cannot be less than (next_slot.PSL - 1)
|
|
const next_idx = (i + 1) & ht.entry_mask;
|
|
const next_stored_dist = ht.dists[next_idx];
|
|
if (next_stored_dist != EMPTY_DIST) {
|
|
const current_ideal = hash(entry.key) & ht.entry_mask;
|
|
const next_ideal = hash(ht.entries[next_idx].key) & ht.entry_mask;
|
|
const current_psl = (i -% current_ideal) & ht.entry_mask;
|
|
const next_psl = (next_idx -% next_ideal) & ht.entry_mask;
|
|
try testing.expect(next_psl <= current_psl + 1);
|
|
}
|
|
}
|
|
|
|
try testing.expectEqual(ht.count.load(.acquire), count);
|
|
}
|
|
};
|
|
}
|
|
|
|
pub const Statistics = struct {
|
|
capacity: usize,
|
|
count: usize,
|
|
load_factor: f64,
|
|
max_psl: usize,
|
|
avg_psl: f64,
|
|
median_psl: usize,
|
|
num_shards: usize,
|
|
};
|
|
|
|
/// Emits an optimal architecture-specific LoadLoad barrier.
|
|
/// Required for the read-side of sequence locks to ensure the data reads are not reordered before
|
|
/// the first version read, or after the second version read.
|
|
inline fn loadFence() void {
|
|
switch (builtin.cpu.arch) {
|
|
.x86_64, .x86 => {
|
|
// x86 memory model is TSO. Hardware does not reorder loads with other loads.
|
|
// A compiler barrier is strictly sufficient.
|
|
asm volatile ("" ::: .{ .memory = true });
|
|
},
|
|
.aarch64, .aarch64_be => {
|
|
asm volatile ("dmb ishld" ::: .{ .memory = true });
|
|
},
|
|
.riscv64 => {
|
|
asm volatile ("fence r, r" ::: .{ .memory = true });
|
|
},
|
|
else => {
|
|
// Fallback: emulate a full sequence point using a dummy atomic RMW.
|
|
var dummy: u8 = 0;
|
|
_ = @cmpxchgWeak(u8, &dummy, 0, 0, .seq_cst, .seq_cst);
|
|
},
|
|
}
|
|
}
|
|
|
|
/// A hybrid Spinlock / Sequence Lock.
|
|
/// - Writers use `lock()` which utilizes a CAS-based spinlock and `unlock()`. Both increase the
|
|
/// version/timestamp.
|
|
/// - Readers use `readBegin()` and `readValid()` to perform lock-free reads.
|
|
const ShardLock = struct {
|
|
/// Even = Unlocked, Odd = Locked
|
|
version: atomic.Value(u64) = atomic.Value(u64).init(0),
|
|
/// To remove false_sharing.
|
|
/// OPTIM: If there are a lot of shards compared to the number of cores, we could probably drop
|
|
/// the padding and save some memory, while having similar performance.
|
|
padding: [cache_line_size - @sizeOf(u64)]u8 = undefined,
|
|
|
|
fn readBegin(self: *const ShardLock) u64 {
|
|
while (true) {
|
|
const current = self.version.load(.acquire);
|
|
if (current & 1 == 0) return current;
|
|
atomic.spinLoopHint();
|
|
}
|
|
}
|
|
|
|
fn readValid(self: *const ShardLock, ts: u64) bool {
|
|
return ts == self.version.load(.acquire);
|
|
}
|
|
|
|
fn lock(self: *ShardLock) void {
|
|
var current = self.version.load(.acquire);
|
|
while (true) {
|
|
// Wait for even
|
|
while (current & 1 != 0) {
|
|
atomic.spinLoopHint();
|
|
current = self.version.load(.monotonic);
|
|
}
|
|
|
|
// CAS to switch to odd
|
|
if (self.version.cmpxchgWeak(current, current + 1, .acquire, .monotonic)) |c| {
|
|
current = c;
|
|
} else {
|
|
return; // Locked successfully
|
|
}
|
|
}
|
|
}
|
|
|
|
fn unlock(self: *ShardLock) void {
|
|
const before = self.version.fetchAdd(1, .release);
|
|
assert(before & 1 == 1);
|
|
}
|
|
};
|
|
|
|
test "basic usage" {
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(testing.allocator, 4, 2);
|
|
defer ht.deinit(testing.allocator);
|
|
|
|
ht.put(1, 10);
|
|
ht.put(2, 20);
|
|
ht.put(3, 30);
|
|
|
|
try testing.expectEqual(10, ht.get(1));
|
|
try testing.expectEqual(20, ht.get(2));
|
|
try testing.expectEqual(30, ht.get(3));
|
|
try testing.expectEqual(null, ht.get(99));
|
|
|
|
ht.put(2, 22);
|
|
try testing.expectEqual(22, ht.get(2).?);
|
|
|
|
const val = ht.fetchRemove(2);
|
|
try testing.expectEqual(22, val);
|
|
try testing.expectEqual(null, ht.get(2));
|
|
|
|
try testing.expectEqual(10, ht.get(1));
|
|
try testing.expectEqual(30, ht.get(3));
|
|
}
|
|
|
|
test "collision and robin hood" {
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(testing.allocator, 4, 4);
|
|
defer ht.deinit(testing.allocator);
|
|
|
|
ht.put(10, 100);
|
|
ht.put(20, 200);
|
|
ht.put(30, 300);
|
|
ht.put(40, 400); // Full
|
|
|
|
try testing.expectEqual(100, ht.get(10));
|
|
try testing.expectEqual(400, ht.get(40));
|
|
|
|
ht.remove(10);
|
|
try testing.expectEqual(null, ht.get(10));
|
|
try testing.expectEqual(400, ht.get(40));
|
|
}
|
|
|
|
test "clear" {
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(testing.allocator, 16, 4);
|
|
defer ht.deinit(testing.allocator);
|
|
|
|
const num_entries = 8;
|
|
for (0..num_entries) |i| ht.put(i, i);
|
|
try testing.expectEqual(num_entries, ht.count.load(.monotonic));
|
|
for (0..num_entries) |i| try testing.expectEqual(i, ht.get(i));
|
|
|
|
ht.clear();
|
|
try testing.expectEqual(0, ht.count.load(.monotonic));
|
|
for (0..num_entries) |i| try testing.expectEqual(null, ht.get(i));
|
|
}
|
|
|
|
test "iterators basic" {
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(testing.allocator, 16, 4);
|
|
defer ht.deinit(testing.allocator);
|
|
|
|
const num_entries = 10;
|
|
for (0..num_entries) |i| ht.put(i, i);
|
|
|
|
// Test Locking Iterator
|
|
{
|
|
var it = ht.lockingIterator();
|
|
defer it.deinit();
|
|
var count: usize = 0;
|
|
while (it.next()) |entry| {
|
|
try testing.expectEqual(entry.key, entry.value);
|
|
count += 1;
|
|
}
|
|
try testing.expectEqual(num_entries, count);
|
|
}
|
|
|
|
// Test Optimistic Iterator
|
|
{
|
|
var it = ht.approximateIterator();
|
|
var count: usize = 0;
|
|
while (it.next()) |entry| {
|
|
try testing.expectEqual(entry.key, entry.value);
|
|
count += 1;
|
|
}
|
|
try testing.expectEqual(num_entries, count);
|
|
}
|
|
}
|
|
|
|
test "locking iterator early break" {
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(testing.allocator, 16, 4);
|
|
defer ht.deinit(testing.allocator);
|
|
|
|
for (0..10) |i| ht.put(i, i);
|
|
|
|
{
|
|
var it = ht.lockingIterator();
|
|
defer it.deinit();
|
|
_ = it.next();
|
|
}
|
|
|
|
// If deinit failed, this put will deadlock
|
|
ht.put(99, 99);
|
|
try testing.expectEqual(99, ht.get(99).?);
|
|
}
|
|
|
|
test "single threaded fuzz" {
|
|
const Seed = 42;
|
|
const capacity = 1024;
|
|
const shards = 32;
|
|
const iterations = 100_000;
|
|
|
|
var prng = std.Random.DefaultPrng.init(Seed);
|
|
const random = prng.random();
|
|
const allocator = testing.allocator;
|
|
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(allocator, capacity, shards);
|
|
defer ht.deinit(allocator);
|
|
|
|
var ref_map = std.AutoHashMapUnmanaged(u64, u64).empty;
|
|
try ref_map.ensureTotalCapacity(allocator, capacity * 2);
|
|
defer ref_map.deinit(allocator);
|
|
|
|
for (0..iterations) |i| {
|
|
const action = random.uintAtMostBiased(u8, 9);
|
|
const key = random.int(u64) & ((capacity * 8) - 1);
|
|
|
|
if (action <= 2) { // put
|
|
if (!ht.atCapacity()) {
|
|
const val = random.int(u64);
|
|
ht.put(key, val);
|
|
ref_map.putAssumeCapacity(key, val);
|
|
}
|
|
} else if (action <= 7) { // fetch
|
|
const ht_val = ht.getChecked(key);
|
|
const ref_val = ref_map.get(key);
|
|
try testing.expectEqual(ref_val, ht_val);
|
|
} else { // remove
|
|
ht.remove(key);
|
|
_ = ref_map.remove(key);
|
|
}
|
|
|
|
// verify integrity
|
|
if (i % 1000 == 0) {
|
|
var it = ref_map.iterator();
|
|
while (it.next()) |entry| {
|
|
const stored_val = ht.get(entry.key_ptr.*);
|
|
try testing.expect(stored_val != null);
|
|
try testing.expectEqual(entry.value_ptr.*, stored_val.?);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn stressTest(
|
|
comptime capacity: usize,
|
|
comptime shards: u64,
|
|
comptime iterations: u64,
|
|
comptime keys_per_thread: u64,
|
|
comptime num_threads: u64,
|
|
) !void {
|
|
const allocator = testing.allocator;
|
|
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(allocator, capacity, shards);
|
|
defer ht.deinit(allocator);
|
|
|
|
const Context = struct {
|
|
id: u64,
|
|
allocator: Allocator,
|
|
ht: *AutoHashMapConcurrent(u64, u64),
|
|
iterations: u64,
|
|
keys_per_thread: u64,
|
|
|
|
fn run(
|
|
id: u64,
|
|
alloc: Allocator,
|
|
table: *AutoHashMapConcurrent(u64, u64),
|
|
iter: u64,
|
|
k_per_t: u64,
|
|
) !void {
|
|
var prng = std.Random.DefaultPrng.init(id);
|
|
const random = prng.random();
|
|
|
|
var ref_map = std.AutoHashMapUnmanaged(u64, u64).empty;
|
|
try ref_map.ensureTotalCapacity(alloc, capacity * 2);
|
|
defer ref_map.deinit(alloc);
|
|
|
|
const start = id * k_per_t;
|
|
const end = start + k_per_t;
|
|
|
|
for (0..iter) |_| {
|
|
const action = random.uintAtMostBiased(u8, 9);
|
|
const key = random.intRangeLessThan(u64, start, end);
|
|
|
|
if (action <= 3) { // put
|
|
if (!table.atCapacity()) {
|
|
const val = random.int(u64);
|
|
table.put(key, val);
|
|
ref_map.putAssumeCapacity(key, val);
|
|
}
|
|
} else if (action <= 4) { // fetch
|
|
const ht_val = table.getChecked(key);
|
|
const ref_val = ref_map.get(key);
|
|
try testing.expectEqual(ref_val, ht_val);
|
|
} else { // remove
|
|
table.remove(key);
|
|
_ = ref_map.remove(key);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
var threads: [num_threads]std.Thread = undefined;
|
|
for (0..num_threads) |i| {
|
|
threads[i] = try std.Thread.spawn(
|
|
.{ .allocator = allocator },
|
|
Context.run,
|
|
.{ i, allocator, &ht, iterations, keys_per_thread },
|
|
);
|
|
}
|
|
|
|
for (threads) |t| {
|
|
t.join();
|
|
}
|
|
try ht.verifyIntegrity();
|
|
}
|
|
|
|
test "multithreaded fuzz" {
|
|
try stressTest(
|
|
1024,
|
|
32,
|
|
100_000,
|
|
512,
|
|
8,
|
|
);
|
|
}
|
|
|
|
test "multithreaded stress" {
|
|
if (true) {
|
|
return error.SkipZigTest;
|
|
}
|
|
try stressTest(
|
|
1024 * 1024,
|
|
1024,
|
|
1_000_000_000,
|
|
1024 * 1024,
|
|
8,
|
|
);
|
|
}
|
|
|
|
test "torn reads and value clobbering" {
|
|
const num_threads = 16;
|
|
const capacity = 64;
|
|
const shards = 4;
|
|
const num_keys = 8;
|
|
const keys = [_]u64{ 0, 1, 2, 3, 4, 5, 6, 7 };
|
|
const time_ns = 1 * std.time.ns_per_s;
|
|
|
|
const allocator = testing.allocator;
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(allocator, capacity, shards);
|
|
defer ht.deinit(allocator);
|
|
|
|
var stop = atomic.Value(bool).init(false);
|
|
|
|
const Context = struct {
|
|
fn run(
|
|
id: usize,
|
|
_ht: *AutoHashMapConcurrent(u64, u64),
|
|
_stop: *atomic.Value(bool),
|
|
) !void {
|
|
var prng = std.Random.DefaultPrng.init(id);
|
|
const random = prng.random();
|
|
|
|
// Each thread has a unique pattern like 0x0000000100000001
|
|
const thread_pattern: u32 = @intCast(id + 1);
|
|
const thread_val: u64 = (@as(u64, thread_pattern) << 32) | thread_pattern;
|
|
|
|
while (!_stop.load(.monotonic)) {
|
|
const key = keys[random.uintLessThan(usize, num_keys)];
|
|
|
|
// 50% Put, 50% Get
|
|
if (random.boolean()) {
|
|
_ht.put(key, thread_val);
|
|
} else {
|
|
const val = try _ht.getChecked(key);
|
|
|
|
if (val) |v| {
|
|
const high: u32 = @intCast(v >> 32);
|
|
const low: u32 = @intCast(v & 0xFFFFFFFF);
|
|
|
|
try testing.expectEqual(high, low); // torn read
|
|
try testing.expect(low != 0);
|
|
try testing.expect(low <= num_threads);
|
|
try testing.expect(high != 0);
|
|
try testing.expect(high <= num_threads);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
var threads: [num_threads]std.Thread = undefined;
|
|
for (0..num_threads) |i| {
|
|
threads[i] = try std.Thread.spawn(
|
|
.{},
|
|
Context.run,
|
|
.{ i, &ht, &stop },
|
|
);
|
|
}
|
|
|
|
std.Thread.sleep(time_ns);
|
|
stop.store(true, .monotonic);
|
|
|
|
for (threads) |t| t.join();
|
|
|
|
try ht.verifyIntegrity();
|
|
}
|
|
|
|
test "structural integrity" {
|
|
const cap = 512;
|
|
const shards = 16;
|
|
const num_threads = 16;
|
|
const key_range = 1024;
|
|
const num_rounds = 50;
|
|
const time_per_round_ns = 25 * std.time.ns_per_ms;
|
|
|
|
const allocator = testing.allocator;
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(allocator, cap, shards);
|
|
defer ht.deinit(allocator);
|
|
|
|
var stop = atomic.Value(bool).init(false);
|
|
var exit = atomic.Value(bool).init(false);
|
|
var start_sem = std.Thread.Semaphore{};
|
|
var done_sem = std.Thread.Semaphore{};
|
|
const Context = struct {
|
|
fn run(
|
|
_ht: *AutoHashMapConcurrent(u64, u64),
|
|
_stop: *atomic.Value(bool),
|
|
_exit: *atomic.Value(bool),
|
|
_start: *std.Thread.Semaphore,
|
|
_done: *std.Thread.Semaphore,
|
|
seed: u64,
|
|
) !void {
|
|
var prng = std.Random.DefaultPrng.init(seed);
|
|
const random = prng.random();
|
|
|
|
while (true) {
|
|
_start.wait();
|
|
if (_exit.load(.monotonic)) break;
|
|
|
|
while (!_stop.load(.monotonic)) {
|
|
const key = random.uintLessThan(u64, key_range);
|
|
const action = random.uintLessThan(u8, 10);
|
|
|
|
if (action < 4) { // 40% Put
|
|
if (!_ht.atCapacity()) {
|
|
_ht.put(key, key);
|
|
}
|
|
} else if (action < 8) { // 40% Remove
|
|
_ht.remove(key);
|
|
} else { // 20% Get
|
|
_ = try _ht.getChecked(key);
|
|
}
|
|
|
|
// Yield occasionally to increase interleaving
|
|
if (random.uintLessThan(u8, 100) == 0) try std.Thread.yield();
|
|
}
|
|
_done.post();
|
|
}
|
|
}
|
|
};
|
|
|
|
var threads: [num_threads]std.Thread = undefined;
|
|
for (0..num_threads) |i| {
|
|
threads[i] = try std.Thread.spawn(
|
|
.{},
|
|
Context.run,
|
|
.{ &ht, &stop, &exit, &start_sem, &done_sem, i },
|
|
);
|
|
}
|
|
|
|
// We run multiple rounds of stress. In each rounds, threads cause chaos, then we stop them and
|
|
// verify the table isn't corrupted.
|
|
for (0..num_rounds) |_| {
|
|
stop.store(false, .monotonic);
|
|
for (0..num_threads) |_| start_sem.post();
|
|
|
|
std.Thread.sleep(time_per_round_ns);
|
|
|
|
// Stop the world
|
|
stop.store(true, .monotonic);
|
|
for (0..num_threads) |_| done_sem.wait();
|
|
|
|
try ht.verifyIntegrity();
|
|
}
|
|
|
|
exit.store(true, .monotonic);
|
|
for (0..num_threads) |_| start_sem.post();
|
|
for (threads) |t| t.join();
|
|
}
|
|
|
|
test "linearizability and reference matching" {
|
|
const cap = 1024;
|
|
const shards = 64;
|
|
const num_keys = 128;
|
|
const num_workers = 8;
|
|
const time_ns = 1 * std.time.ns_per_s;
|
|
|
|
const allocator = testing.allocator;
|
|
var ht = try AutoHashMapConcurrent(u64, u64).init(allocator, cap, shards);
|
|
defer ht.deinit(allocator);
|
|
|
|
var stop = atomic.Value(bool).init(false);
|
|
|
|
const Reference = struct {
|
|
values: [num_keys]atomic.Value(u64),
|
|
locks: [num_keys]std.Thread.Mutex,
|
|
|
|
fn init() @This() {
|
|
var self: @This() = undefined;
|
|
for (0..num_keys) |i| {
|
|
self.values[i] = atomic.Value(u64).init(0);
|
|
self.locks[i] = .{};
|
|
}
|
|
return self;
|
|
}
|
|
};
|
|
var ref = Reference.init();
|
|
|
|
const Worker = struct {
|
|
fn run(
|
|
table: *AutoHashMapConcurrent(u64, u64),
|
|
reference: *Reference,
|
|
is_stop: *atomic.Value(bool),
|
|
seed: u64,
|
|
) !void {
|
|
var prng = std.Random.DefaultPrng.init(seed);
|
|
const random = prng.random();
|
|
|
|
while (!is_stop.load(.monotonic)) {
|
|
const key = random.uintLessThan(usize, num_keys);
|
|
const val = random.intRangeAtMost(u64, 1, math.maxInt(u64) - 1);
|
|
|
|
// Lock the reference key to ensure the Map and the Reference are updated atomically
|
|
reference.locks[key].lock();
|
|
defer reference.locks[key].unlock();
|
|
|
|
table.put(key, val);
|
|
reference.values[key].store(val, .release);
|
|
|
|
if (random.uintLessThan(u8, 100) == 0) try std.Thread.yield();
|
|
}
|
|
}
|
|
};
|
|
|
|
const Observer = struct {
|
|
fn run(
|
|
table: *AutoHashMapConcurrent(u64, u64),
|
|
reference: *Reference,
|
|
is_stop: *atomic.Value(bool),
|
|
) !void {
|
|
while (!is_stop.load(.monotonic)) {
|
|
for (0..num_keys) |key| {
|
|
reference.locks[key].lock();
|
|
defer reference.locks[key].unlock();
|
|
|
|
const map_val = try table.getChecked(key) orelse 0; // 0 instead of null
|
|
const ref_val = reference.values[key].load(.acquire);
|
|
try testing.expectEqual(ref_val, map_val);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
var workers: [num_workers]std.Thread = undefined;
|
|
for (0..num_workers) |i| {
|
|
workers[i] = try std.Thread.spawn(.{}, Worker.run, .{ &ht, &ref, &stop, i });
|
|
}
|
|
const observer_thread = try std.Thread.spawn(.{}, Observer.run, .{ &ht, &ref, &stop });
|
|
|
|
std.Thread.sleep(time_ns);
|
|
stop.store(true, .monotonic);
|
|
|
|
for (workers) |t| t.join();
|
|
observer_thread.join();
|
|
|
|
// Final check
|
|
for (0..num_keys) |i| {
|
|
const map_v = try ht.getChecked(i);
|
|
const ref_v = ref.values[i].load(.monotonic);
|
|
try testing.expectEqual(ref_v, map_v.?);
|
|
}
|
|
try ht.verifyIntegrity();
|
|
}
|
|
|
|
/// Wrapper to make standard HashMap thread-safe for comparison
|
|
fn MutexHashMap(K: type, V: type, max_fill_factor: f64) type {
|
|
const max_load_percentage: u64 = @intFromFloat(max_fill_factor * 100);
|
|
return struct {
|
|
const Self = @This();
|
|
mutex: std.Thread.Mutex = .{},
|
|
map: std.HashMapUnmanaged(K, V, std.hash_map.AutoContext(K), max_load_percentage) = .empty,
|
|
|
|
fn init(allocator: Allocator, capacity: u32, _: usize) !Self {
|
|
var self = Self{};
|
|
// Pre-allocate to be fair
|
|
try self.map.ensureTotalCapacity(allocator, capacity);
|
|
return self;
|
|
}
|
|
|
|
fn deinit(self: *Self, allocator: Allocator) void {
|
|
self.map.deinit(allocator);
|
|
}
|
|
|
|
fn atCapacity(self: *Self) bool {
|
|
return self.map.available == 0;
|
|
}
|
|
|
|
fn put(self: *Self, k: u64, v: u64) void {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
self.map.putAssumeCapacity(k, v);
|
|
}
|
|
|
|
fn get(self: *Self, k: u64) ?u64 {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
return self.map.get(k);
|
|
}
|
|
|
|
fn remove(self: *Self, k: u64) void {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
_ = self.map.remove(k);
|
|
}
|
|
};
|
|
}
|
|
|
|
const BenchOptions = struct {
|
|
name: []const u8,
|
|
size: u32,
|
|
num_threads: u64,
|
|
iterations: u64,
|
|
put_prob: u8,
|
|
remove_prob: u8,
|
|
baseline_ns: ?u64 = null,
|
|
};
|
|
|
|
fn benchWorker(
|
|
comptime MapType: type,
|
|
map: *MapType,
|
|
seed: u64,
|
|
options: BenchOptions,
|
|
) void {
|
|
var prng = std.Random.DefaultPrng.init(seed);
|
|
const random = prng.random();
|
|
|
|
const total_ops = options.iterations / options.num_threads;
|
|
const key_range = options.size * 2;
|
|
|
|
for (0..total_ops) |_| {
|
|
const action = random.uintLessThanBiased(u8, 100);
|
|
const key = random.uintLessThanBiased(u64, key_range);
|
|
|
|
if (action < options.put_prob and !map.atCapacity()) {
|
|
map.put(key, key);
|
|
continue;
|
|
}
|
|
if (action < options.put_prob + options.remove_prob) {
|
|
map.remove(key);
|
|
continue;
|
|
}
|
|
_ = map.get(key);
|
|
}
|
|
}
|
|
|
|
fn runBench(
|
|
allocator: Allocator,
|
|
comptime MapType: type,
|
|
options: BenchOptions,
|
|
) !u64 {
|
|
const num_shards = @min(@max(64, options.num_threads * 16), 1024);
|
|
var map = try MapType.init(allocator, options.size, num_shards);
|
|
defer map.deinit(allocator);
|
|
|
|
var threads = try allocator.alloc(std.Thread, options.num_threads);
|
|
defer allocator.free(threads);
|
|
|
|
// Pre fill to avoid empty bias
|
|
var k: u64 = 0;
|
|
while (!map.atCapacity()) : (k += 1) {
|
|
map.put(k, k);
|
|
}
|
|
|
|
const timer_start = std.time.nanoTimestamp();
|
|
for (0..options.num_threads) |i| {
|
|
threads[i] = try std.Thread.spawn(.{}, benchWorker, .{
|
|
MapType, &map, i, options,
|
|
});
|
|
}
|
|
for (threads) |t| t.join();
|
|
return @intCast(std.time.nanoTimestamp() - timer_start);
|
|
}
|
|
|
|
/// These are by no means statistically sound benchmarks. They are just to give a rough guidance on
|
|
/// how to choose parameters.
|
|
pub fn main() !void {
|
|
const size = 1024 * 1024;
|
|
const iterations = 10_000_000;
|
|
const load_factors = [_]f64{ 0.5, 0.8, 0.9, 0.95, 0.98 };
|
|
|
|
const configs = [_]struct { name: []const u8, p: u8, r: u8 }{
|
|
.{ .name = "Read-Heavy", .p = 3, .r = 2 },
|
|
.{ .name = "Balanced", .p = 25, .r = 25 },
|
|
.{ .name = "Write-Heavy", .p = 45, .r = 45 },
|
|
};
|
|
|
|
const allocator = std.heap.page_allocator;
|
|
|
|
var thread_counts = std.ArrayListUnmanaged(u64).empty;
|
|
defer thread_counts.deinit(allocator);
|
|
|
|
const cpu_count = try std.Thread.getCpuCount();
|
|
var t: u64 = 1;
|
|
while (t <= cpu_count) : (t *= 2) {
|
|
try thread_counts.append(allocator, t);
|
|
}
|
|
|
|
const csv_file = try std.fs.cwd().createFile("benchmark_results.csv", .{});
|
|
defer csv_file.close();
|
|
var csv_buffer: [1024]u8 = undefined;
|
|
var csv_file_writer = csv_file.writer(&csv_buffer);
|
|
const csv = &csv_file_writer.interface;
|
|
try csv.print("implementation,load_factor,workload,threads,time_ns,ops_per_sec,speedup\n", .{});
|
|
|
|
var stdout_buffer: [1024]u8 = undefined;
|
|
var stdout_file_writer = std.fs.File.stdout().writer(&stdout_buffer);
|
|
const stdout = &stdout_file_writer.interface;
|
|
|
|
// Header
|
|
try stdout.print(
|
|
"{s:<14} | {s:<4} | {s:<11}",
|
|
.{ "Implementation", "LF", "Workload" },
|
|
);
|
|
for (thread_counts.items) |threads| {
|
|
try stdout.print(" | {d:>3} Threads", .{threads});
|
|
}
|
|
try stdout.print("\n", .{});
|
|
|
|
// Separator
|
|
try stdout.print("{s:-<14}-+-{s:-<4}-+-{s:-<11}", .{ "", "", "" });
|
|
for (thread_counts.items) |_| try stdout.print("-+------------", .{});
|
|
try stdout.print("\n", .{});
|
|
try stdout.flush();
|
|
|
|
inline for (load_factors) |lf| {
|
|
for (configs) |cfg| {
|
|
const impls = [_][]const u8{ "Concurrent", "Mutex" };
|
|
inline for (impls) |impl_name| {
|
|
// Do not spam with unnecessary mutex benchmarks.
|
|
if (comptime mem.eql(u8, impl_name, "Mutex") and lf != 0.8) continue;
|
|
|
|
try stdout.print("{s:<14} | {d:<4.2} | {s:<11}", .{ impl_name, lf, cfg.name });
|
|
|
|
var baseline_ops: u64 = 0;
|
|
for (thread_counts.items) |threads| {
|
|
const options = BenchOptions{
|
|
.name = impl_name,
|
|
.size = size,
|
|
.num_threads = threads,
|
|
.iterations = iterations,
|
|
.put_prob = cfg.p,
|
|
.remove_prob = cfg.r,
|
|
};
|
|
|
|
const time_ns = if (mem.eql(u8, impl_name, "Concurrent"))
|
|
try runBench(
|
|
allocator,
|
|
HashMapConcurrent(
|
|
u64,
|
|
u64,
|
|
getAutoHashFn(u64),
|
|
getAutoEqlFn(u64),
|
|
.{ .max_fill_factor = lf },
|
|
),
|
|
options,
|
|
)
|
|
else
|
|
try runBench(allocator, MutexHashMap(u64, u64, lf), options);
|
|
|
|
try stdout.print(" | {D:>9} ", .{time_ns});
|
|
|
|
const total_ops: f64 = @floatFromInt(iterations);
|
|
const time_s: f64 = @as(f64, @floatFromInt(time_ns)) / 1e9;
|
|
const ops_sec: u64 = @intFromFloat(total_ops / time_s);
|
|
|
|
if (threads == 1) baseline_ops = ops_sec;
|
|
const speedup = if (baseline_ops > 0)
|
|
@as(f64, @floatFromInt(ops_sec)) / @as(f64, @floatFromInt(baseline_ops))
|
|
else
|
|
0;
|
|
|
|
try csv.print(
|
|
"{s},{d:.2},{s},{d},{d},{d},{d:.2}\n",
|
|
.{ impl_name, lf, cfg.name, threads, time_ns, ops_sec, speedup },
|
|
);
|
|
}
|
|
try stdout.print("\n", .{});
|
|
try stdout.flush();
|
|
}
|
|
}
|
|
// Small separator between Load Factors
|
|
try stdout.print("{s:-<14}-+-{s:-<4}-+-{s:-<11}", .{ "", "", "" });
|
|
for (thread_counts.items) |_| try stdout.print("-+------------", .{});
|
|
try stdout.print("\n", .{});
|
|
try stdout.flush();
|
|
}
|
|
try csv.flush();
|
|
try stdout.flush();
|
|
}
|