pg sharding works

This commit is contained in:
Lev Kokotov
2022-02-08 11:10:28 -08:00
parent 27c05cfd53
commit 39a9dff26b
2 changed files with 72 additions and 17 deletions

View File

@@ -26,15 +26,17 @@ impl Sharder {
/// Hash function used by Postgres to determine which partition
/// to put the row in when using HASH(column) partitioning.
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631
pub fn pg_bigint_hash(&self, key: i64) -> usize {
/// Supports only 1 bigint at the moment, but we can add more later.
pub fn pg_bigint_hash(&self, key: i64) -> u64 {
let mut lohalf = key as u32;
let hihalf = (key >> 32) as u32;
lohalf ^= if key >= 0 { hihalf } else { !hihalf };
Self::pg_u32_hash(lohalf) as usize % self.shards
Self::combine(0, Self::pg_u32_hash(lohalf)) % self.shards as u64
}
#[inline]
fn rot(x: u32, k: u32) -> u32 {
((x) << (k)) | ((x) >> (32 - (k)))
(x << k) | (x >> (32 - k))
}
#[inline]
@@ -85,19 +87,27 @@ impl Sharder {
(a, b, c)
}
#[inline]
fn combine(mut a: u64, b: u64) -> u64 {
a ^= b
.wrapping_add(0x49a0f4dd15e5a8e3 as u64)
.wrapping_add(a << 54)
.wrapping_add(a >> 7);
a
}
fn pg_u32_hash(k: u32) -> u64 {
let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::<u32>() as u32 + 3923095 as u32;
let mut b = a;
let c = a;
let seed = PARTITION_HASH_SEED;
a = a.wrapping_add((seed >> 32) as u32);
b = b.wrapping_add(seed as u32);
a = a.wrapping_add((PARTITION_HASH_SEED >> 32) as u32);
b = b.wrapping_add(PARTITION_HASH_SEED as u32);
let (mut a, b, c) = Self::mix(a, b, c);
a = a.wrapping_add(k);
let (a, b, c) = Self::_final(a, b, c);
let (_a, b, c) = Self::_final(a, b, c);
((b as u64) << 32) | (c as u64)
}
@@ -117,17 +127,36 @@ mod test {
#[test]
fn test_pg_bigint_hash() {
let sharder = Sharder::new(2);
let key = 1 as i64;
let shard = sharder.pg_bigint_hash(key);
assert_eq!(shard, 0);
let sharder = Sharder::new(5);
let key = 2 as i64;
let shard = sharder.pg_bigint_hash(key);
assert_eq!(shard, 0);
let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53];
let key = 3 as i64;
let shard = sharder.pg_bigint_hash(key);
assert_eq!(shard, 1);
for v in shard_0 {
assert_eq!(sharder.pg_bigint_hash(v), 0);
}
let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54];
for v in shard_1 {
assert_eq!(sharder.pg_bigint_hash(v), 1);
}
let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35];
for v in shard_2 {
assert_eq!(sharder.pg_bigint_hash(v), 2);
}
let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43];
for v in shard_3 {
assert_eq!(sharder.pg_bigint_hash(v), 3);
}
let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45];
for v in shard_4 {
assert_eq!(sharder.pg_bigint_hash(v), 4);
}
}
}

26
tests/sharding/setup.sql Normal file
View File

@@ -0,0 +1,26 @@
DROP TABLE IF EXISTS shards CASCADE;
CREATE TABLE shards (
id BIGINT,
value VARCHAR
) PARTITION BY HASH (id);
-- DROP TABLE IF EXISTS shard_0;
CREATE TABLE shard_0 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 0);
-- DROP TABLE IF EXISTS shard_1;
CREATE TABLE shard_1 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 1);
-- DROP TABLE IF EXISTS shard_2;
CREATE TABLE shard_2 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 2);
-- DROP TABLE IF EXISTS shard_3;
CREATE TABLE shard_3 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 3);
-- DROP TABLE IF EXISTS shard_4;
CREATE TABLE shard_4 PARTITION OF shards FOR VALUES WITH (MODULUS 5, REMAINDER 4);
INSERT INTO shards SELECT generate_series(1, 500), 'value';
SELECT * FROM shard_0 ORDER BY id LIMIT 10;
SELECT * FROM shard_1 ORDER BY id LIMIT 10;
SELECT * FROM shard_2 ORDER BY id LIMIT 10;
SELECT * FROM shard_3 ORDER BY id LIMIT 10;
SELECT * FROM shard_4 ORDER BY id LIMIT 10;