diff --git a/pgcat.toml b/pgcat.toml index 9a240b3..2131802 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -96,3 +96,13 @@ query_parser_enabled = false # load balancing of read queries. Otherwise, the primary will only be used for write # queries. The primary can always be explicitely selected with our custom protocol. primary_reads_enabled = true + +# So what if you wanted to implement a different hashing function, +# or you've already built one and you want this pooler to use it? +# +# Current options: +# +# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function) +# sha1: A hashing function based on SHA1 +# +sharding_function = "pg_bigint_hash" diff --git a/src/config.rs b/src/config.rs index 04a8162..754dc4f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -118,6 +118,7 @@ pub struct QueryRouter { pub default_role: String, pub query_parser_enabled: bool, pub primary_reads_enabled: bool, + pub sharding_function: String, } impl Default for QueryRouter { @@ -126,6 +127,7 @@ impl Default for QueryRouter { default_role: String::from("any"), query_parser_enabled: false, primary_reads_enabled: true, + sharding_function: "pg_bigint_hash".to_string(), } } } @@ -159,6 +161,8 @@ impl Config { self.general.healthcheck_timeout ); info!("Connection timeout: {}ms", self.general.connect_timeout); + info!("Sharding function: {}", self.query_router.sharding_function); + info!("Number of shards: {}", self.shards.len()); } } @@ -193,6 +197,18 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; + match config.query_router.sharding_function.as_ref() { + "pg_bigint_hash" => (), + "sha1" => (), + _ => { + error!( + "Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}'", + config.query_router.sharding_function + ); + return Err(Error::BadConfig); + } + }; + // Quick config sanity check. for shard in &config.shards { // We use addresses as unique identifiers, diff --git a/src/query_router.rs b/src/query_router.rs index 8dddef0..3f309ea 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1,5 +1,5 @@ use crate::config::{get_config, Role}; -use crate::sharding::Sharder; +use crate::sharding::{Sharder, ShardingFunction}; /// Route queries automatically based on explicitely requested /// or implied query characteristics. use bytes::{Buf, BytesMut}; @@ -48,6 +48,9 @@ pub struct QueryRouter { // Should we try to parse queries? query_parser_enabled: bool, + + // Which sharding function are we using? + sharding_function: ShardingFunction, } impl QueryRouter { @@ -76,6 +79,12 @@ impl QueryRouter { _ => unreachable!(), }; + let sharding_function = match config.query_router.sharding_function.as_ref() { + "pg_bigint_hash" => ShardingFunction::PgBigintHash, + "sha1" => ShardingFunction::Sha1, + _ => unreachable!(), + }; + QueryRouter { default_server_role: default_server_role, shards: config.shards.len(), @@ -84,6 +93,7 @@ impl QueryRouter { active_shard: None, primary_reads_enabled: config.query_router.primary_reads_enabled, query_parser_enabled: config.query_router.query_parser_enabled, + sharding_function, } } @@ -139,8 +149,8 @@ impl QueryRouter { match command { Command::SetShardingKey => { - let sharder = Sharder::new(self.shards); - let shard = sharder.pg_bigint_hash(value.parse::().unwrap()); + let sharder = Sharder::new(self.shards, self.sharding_function); + let shard = sharder.shard(value.parse::().unwrap()); self.active_shard = Some(shard); value = shard.to_string(); } diff --git a/src/sharding.rs b/src/sharding.rs index 7aed4d3..23eccc1 100644 --- a/src/sharding.rs +++ b/src/sharding.rs @@ -1,26 +1,62 @@ +use sha1::{Digest, Sha1}; + // https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20 const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD; +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum ShardingFunction { + PgBigintHash, + Sha1, +} + pub struct Sharder { shards: usize, + sharding_function: ShardingFunction, } impl Sharder { - pub fn new(shards: usize) -> Sharder { - Sharder { shards: shards } + pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder { + Sharder { + shards, + sharding_function, + } + } + + pub fn shard(&self, key: i64) -> usize { + match self.sharding_function { + ShardingFunction::PgBigintHash => self.pg_bigint_hash(key), + ShardingFunction::Sha1 => self.sha1(key), + } } /// Hash function used by Postgres to determine which partition /// to put the row in when using HASH(column) partitioning. /// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631 /// Supports only 1 bigint at the moment, but we can add more later. - pub fn pg_bigint_hash(&self, key: i64) -> usize { + fn pg_bigint_hash(&self, key: i64) -> usize { let mut lohalf = key as u32; let hihalf = (key >> 32) as u32; lohalf ^= if key >= 0 { hihalf } else { !hihalf }; Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards } + /// Example of a hashing function based on SHA1. + fn sha1(&self, key: i64) -> usize { + let mut hasher = Sha1::new(); + + hasher.update(&key.to_string().as_bytes()); + + let result = hasher.finalize(); + + // Convert the SHA1 hash into hex so we can parse it as a large integer. + let hex = format!("{:x}", result); + + // Parse the last 8 bytes as an integer (8 bytes = bigint). + let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize; + + key % self.shards + } + #[inline] fn rot(x: u32, k: u32) -> u32 { (x << k) | (x >> (32 - k)) @@ -109,36 +145,51 @@ mod test { // confirming that we implemented Postgres BIGINT hashing correctly. #[test] fn test_pg_bigint_hash() { - let sharder = Sharder::new(5); + let sharder = Sharder::new(5, ShardingFunction::PgBigintHash); let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53]; for v in shard_0 { - assert_eq!(sharder.pg_bigint_hash(v), 0); + assert_eq!(sharder.shard(v), 0); } let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54]; for v in shard_1 { - assert_eq!(sharder.pg_bigint_hash(v), 1); + assert_eq!(sharder.shard(v), 1); } let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35]; for v in shard_2 { - assert_eq!(sharder.pg_bigint_hash(v), 2); + assert_eq!(sharder.shard(v), 2); } let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43]; for v in shard_3 { - assert_eq!(sharder.pg_bigint_hash(v), 3); + assert_eq!(sharder.shard(v), 3); } let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45]; for v in shard_4 { - assert_eq!(sharder.pg_bigint_hash(v), 4); + assert_eq!(sharder.shard(v), 4); + } + } + + #[test] + fn test_sha1_hash() { + let sharder = Sharder::new(12, ShardingFunction::Sha1); + let ids = vec![ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + ]; + let shards = vec![ + 4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3, + ]; + + for (i, id) in ids.iter().enumerate() { + assert_eq!(sharder.shard(*id), shards[i]); } } }