diff --git a/src/sharding.rs b/src/sharding.rs index e9cd365..57cf807 100644 --- a/src/sharding.rs +++ b/src/sharding.rs @@ -26,15 +26,17 @@ impl Sharder { /// Hash function used by Postgres to determine which partition /// to put the row in when using HASH(column) partitioning. /// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631 - pub fn pg_bigint_hash(&self, key: i64) -> usize { + /// Supports only 1 bigint at the moment, but we can add more later. + pub fn pg_bigint_hash(&self, key: i64) -> u64 { let mut lohalf = key as u32; let hihalf = (key >> 32) as u32; lohalf ^= if key >= 0 { hihalf } else { !hihalf }; - Self::pg_u32_hash(lohalf) as usize % self.shards + Self::combine(0, Self::pg_u32_hash(lohalf)) % self.shards as u64 } + #[inline] fn rot(x: u32, k: u32) -> u32 { - ((x) << (k)) | ((x) >> (32 - (k))) + (x << k) | (x >> (32 - k)) } #[inline] @@ -85,19 +87,27 @@ impl Sharder { (a, b, c) } + #[inline] + fn combine(mut a: u64, b: u64) -> u64 { + a ^= b + .wrapping_add(0x49a0f4dd15e5a8e3 as u64) + .wrapping_add(a << 54) + .wrapping_add(a >> 7); + a + } + fn pg_u32_hash(k: u32) -> u64 { let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::() as u32 + 3923095 as u32; let mut b = a; let c = a; - let seed = PARTITION_HASH_SEED; - a = a.wrapping_add((seed >> 32) as u32); - b = b.wrapping_add(seed as u32); + a = a.wrapping_add((PARTITION_HASH_SEED >> 32) as u32); + b = b.wrapping_add(PARTITION_HASH_SEED as u32); let (mut a, b, c) = Self::mix(a, b, c); a = a.wrapping_add(k); - let (a, b, c) = Self::_final(a, b, c); + let (_a, b, c) = Self::_final(a, b, c); ((b as u64) << 32) | (c as u64) } @@ -117,17 +127,36 @@ mod test { #[test] fn test_pg_bigint_hash() { - let sharder = Sharder::new(2); - let key = 1 as i64; - let shard = sharder.pg_bigint_hash(key); - assert_eq!(shard, 0); + let sharder = Sharder::new(5); - let key = 2 as i64; - let shard = sharder.pg_bigint_hash(key); - assert_eq!(shard, 0); + let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53]; - let key = 3 as i64; - let shard = sharder.pg_bigint_hash(key); - assert_eq!(shard, 1); + for v in shard_0 { + assert_eq!(sharder.pg_bigint_hash(v), 0); + } + + let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54]; + + for v in shard_1 { + assert_eq!(sharder.pg_bigint_hash(v), 1); + } + + let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35]; + + for v in shard_2 { + assert_eq!(sharder.pg_bigint_hash(v), 2); + } + + let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43]; + + for v in shard_3 { + assert_eq!(sharder.pg_bigint_hash(v), 3); + } + + let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45]; + + for v in shard_4 { + assert_eq!(sharder.pg_bigint_hash(v), 4); + } } } diff --git a/tests/sharding/setup.sql b/tests/sharding/setup.sql new file mode 100644 index 0000000..e802ead --- /dev/null +++ b/tests/sharding/setup.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS shards CASCADE; + +CREATE TABLE shards ( + id BIGINT, + value VARCHAR +) PARTITION BY HASH (id); + +-- DROP TABLE IF EXISTS shard_0; +CREATE TABLE shard_0 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 0); +-- DROP TABLE IF EXISTS shard_1; +CREATE TABLE shard_1 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 1); +-- DROP TABLE IF EXISTS shard_2; +CREATE TABLE shard_2 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 2); +-- DROP TABLE IF EXISTS shard_3; +CREATE TABLE shard_3 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 3); +-- DROP TABLE IF EXISTS shard_4; +CREATE TABLE shard_4 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 4); + + +INSERT INTO shards SELECT generate_series(1, 500), 'value'; + +SELECT * FROM shard_0 ORDER BY id LIMIT 10; +SELECT * FROM shard_1 ORDER BY id LIMIT 10; +SELECT * FROM shard_2 ORDER BY id LIMIT 10; +SELECT * FROM shard_3 ORDER BY id LIMIT 10; +SELECT * FROM shard_4 ORDER BY id LIMIT 10; \ No newline at end of file