Compare commits

...

2 Commits

Author SHA1 Message Date
Lev Kokotov
bca5318d5e Fix broken custom config 2022-09-12 15:58:11 -04:00
Lev Kokotov
efd6b2edae Automatic shard detection 2022-09-12 15:07:10 -04:00
5 changed files with 82 additions and 19 deletions

View File

@@ -82,6 +82,7 @@ primary_reads_enabled = true
# sha1: A hashing function based on SHA1 # sha1: A hashing function based on SHA1
# #
sharding_function = "pg_bigint_hash" sharding_function = "pg_bigint_hash"
sharding_key = "id"
# Credentials for users that may connect to this cluster # Credentials for users that may connect to this cluster
[pools.sharded_db.users.0] [pools.sharded_db.users.0]

View File

@@ -662,7 +662,7 @@ where
// Normal query, not a custom command. // Normal query, not a custom command.
None => { None => {
if query_router.query_parser_enabled() { if query_router.query_parser_enabled() {
query_router.infer_role(message.clone()); query_router.infer_role_and_shard(message.clone());
} }
} }

View File

@@ -185,6 +185,7 @@ pub struct Pool {
pub query_parser_enabled: bool, pub query_parser_enabled: bool,
pub primary_reads_enabled: bool, pub primary_reads_enabled: bool,
pub sharding_function: String, pub sharding_function: String,
pub sharding_key: Option<String>,
pub shards: HashMap<String, Shard>, pub shards: HashMap<String, Shard>,
pub users: HashMap<String, User>, pub users: HashMap<String, User>,
} }
@@ -198,6 +199,7 @@ impl Default for Pool {
query_parser_enabled: false, query_parser_enabled: false,
primary_reads_enabled: true, primary_reads_enabled: true,
sharding_function: "pg_bigint_hash".to_string(), sharding_function: "pg_bigint_hash".to_string(),
sharding_key: None,
} }
} }
} }

View File

@@ -8,6 +8,7 @@ use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::thread_rng; use rand::thread_rng;
use regex::Regex;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
@@ -68,6 +69,9 @@ pub struct PoolSettings {
// Sharding function. // Sharding function.
pub sharding_function: ShardingFunction, pub sharding_function: ShardingFunction,
// Automatically detect sharding key in query.
pub sharding_key_regex: Option<Regex>,
} }
impl Default for PoolSettings { impl Default for PoolSettings {
@@ -80,6 +84,7 @@ impl Default for PoolSettings {
query_parser_enabled: false, query_parser_enabled: false,
primary_reads_enabled: true, primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash, sharding_function: ShardingFunction::PgBigintHash,
sharding_key_regex: None,
} }
} }
} }
@@ -229,6 +234,20 @@ impl ConnectionPool {
"sha1" => ShardingFunction::Sha1, "sha1" => ShardingFunction::Sha1,
_ => unreachable!(), _ => unreachable!(),
}, },
sharding_key_regex: match &pool_config.sharding_key {
Some(sharding_key) => match Regex::new(&format!(
r"(?i) *{} *= *'?([0-9]+)'?",
sharding_key
)) {
Ok(regex) => Some(regex),
Err(err) => {
error!("Sharding key regex error: {:?}", err);
return Err(Error::BadConfig);
}
},
None => None,
},
}, },
}; };

View File

@@ -55,6 +55,8 @@ pub struct QueryRouter {
/// Include the primary into the replica pool for reads. /// Include the primary into the replica pool for reads.
primary_reads_enabled: bool, primary_reads_enabled: bool,
set_manually: bool,
/// Pool configuration. /// Pool configuration.
pool_settings: PoolSettings, pool_settings: PoolSettings,
} }
@@ -97,6 +99,7 @@ impl QueryRouter {
active_role: None, active_role: None,
query_parser_enabled: false, query_parser_enabled: false,
primary_reads_enabled: false, primary_reads_enabled: false,
set_manually: false,
pool_settings: PoolSettings::default(), pool_settings: PoolSettings::default(),
} }
} }
@@ -104,6 +107,11 @@ impl QueryRouter {
/// Pool settings can change because of a config reload. /// Pool settings can change because of a config reload.
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) { pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
self.pool_settings = pool_settings; self.pool_settings = pool_settings;
if !self.set_manually {
self.query_parser_enabled = self.pool_settings.query_parser_enabled;
self.primary_reads_enabled = self.pool_settings.primary_reads_enabled;
}
} }
/// Try to parse a command and execute it. /// Try to parse a command and execute it.
@@ -205,6 +213,8 @@ impl QueryRouter {
} }
Command::SetServerRole => { Command::SetServerRole => {
self.set_manually = true;
self.active_role = match value.to_ascii_lowercase().as_ref() { self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => { "primary" => {
self.query_parser_enabled = false; self.query_parser_enabled = false;
@@ -228,7 +238,7 @@ impl QueryRouter {
"default" => { "default" => {
self.active_role = self.pool_settings.default_role; self.active_role = self.pool_settings.default_role;
self.query_parser_enabled = self.query_parser_enabled; self.query_parser_enabled = self.pool_settings.query_parser_enabled;
self.active_role self.active_role
} }
@@ -237,6 +247,8 @@ impl QueryRouter {
} }
Command::SetPrimaryReads => { Command::SetPrimaryReads => {
self.set_manually = true;
if value == "on" { if value == "on" {
debug!("Setting primary reads to on"); debug!("Setting primary reads to on");
self.primary_reads_enabled = true; self.primary_reads_enabled = true;
@@ -256,7 +268,7 @@ impl QueryRouter {
} }
/// Try to infer which server to connect to based on the contents of the query. /// Try to infer which server to connect to based on the contents of the query.
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool { pub fn infer_role_and_shard(&mut self, mut buf: BytesMut) -> bool {
debug!("Inferring role"); debug!("Inferring role");
let code = buf.get_u8() as char; let code = buf.get_u8() as char;
@@ -297,6 +309,31 @@ impl QueryRouter {
_ => return false, _ => return false,
}; };
// First find the shard key
match &self.pool_settings.sharding_key_regex {
Some(re) => {
match re.captures(&query) {
Some(group) => match group.get(1) {
Some(value) => {
let value = value.as_str().parse::<i64>().unwrap();
let sharder = Sharder::new(
self.pool_settings.shards,
self.pool_settings.sharding_function,
);
let shard = sharder.shard(value);
self.active_shard = Some(shard);
debug!("Automatically routing to shard {}", shard);
}
None => (),
},
None => (),
};
}
None => (),
};
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast, Ok(ast) => ast,
Err(err) => { Err(err) => {
@@ -373,7 +410,7 @@ mod test {
} }
#[test] #[test]
fn test_infer_role_replica() { fn test_infer_role_and_shard_replica() {
QueryRouter::setup(); QueryRouter::setup();
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None); assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
@@ -382,22 +419,25 @@ mod test {
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO off")) != None); assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO off")) != None);
let queries = vec![ let queries = vec![
simple_query("SELECT * FROM items WHERE id = 5"), simple_query("SELECT * FROM items WHERE id = 4"),
simple_query( simple_query(
"SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id", "SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id",
), ),
simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"), simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"),
]; ];
for query in queries { let shards = vec![0, 0, 0];
for (idx, query) in queries.iter().enumerate() {
// It's a recognized query // It's a recognized query
assert!(qr.infer_role(query)); assert!(qr.infer_role_and_shard(query.clone()));
assert_eq!(qr.role(), Some(Role::Replica)); assert_eq!(qr.role(), Some(Role::Replica));
assert_eq!(qr.shard(), shards[idx]);
} }
} }
#[test] #[test]
fn test_infer_role_primary() { fn test_infer_role_and_shard_primary() {
QueryRouter::setup(); QueryRouter::setup();
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
@@ -410,24 +450,24 @@ mod test {
for query in queries { for query in queries {
// It's a recognized query // It's a recognized query
assert!(qr.infer_role(query)); assert!(qr.infer_role_and_shard(query));
assert_eq!(qr.role(), Some(Role::Primary)); assert_eq!(qr.role(), Some(Role::Primary));
} }
} }
#[test] #[test]
fn test_infer_role_primary_reads_enabled() { fn test_infer_role_and_shard_primary_reads_enabled() {
QueryRouter::setup(); QueryRouter::setup();
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
let query = simple_query("SELECT * FROM items WHERE id = 5"); let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None); assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);
assert!(qr.infer_role(query)); assert!(qr.infer_role_and_shard(query));
assert_eq!(qr.role(), None); assert_eq!(qr.role(), None);
} }
#[test] #[test]
fn test_infer_role_parse_prepared() { fn test_infer_role_and_shard_parse_prepared() {
QueryRouter::setup(); QueryRouter::setup();
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")); qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
@@ -442,7 +482,7 @@ mod test {
res.put(prepared_stmt); res.put(prepared_stmt);
res.put_i16(0); res.put_i16(0);
assert!(qr.infer_role(res)); assert!(qr.infer_role_and_shard(res));
assert_eq!(qr.role(), Some(Role::Replica)); assert_eq!(qr.role(), Some(Role::Replica));
} }
@@ -606,17 +646,17 @@ mod test {
assert_eq!(qr.role(), None); assert_eq!(qr.role(), None);
let query = simple_query("INSERT INTO test_table VALUES (1)"); let query = simple_query("INSERT INTO test_table VALUES (1)");
assert_eq!(qr.infer_role(query), true); assert_eq!(qr.infer_role_and_shard(query), true);
assert_eq!(qr.role(), Some(Role::Primary)); assert_eq!(qr.role(), Some(Role::Primary));
let query = simple_query("SELECT * FROM test_table"); let query = simple_query("SELECT * FROM test_table");
assert_eq!(qr.infer_role(query), true); assert_eq!(qr.infer_role_and_shard(query), true);
assert_eq!(qr.role(), Some(Role::Replica)); assert_eq!(qr.role(), Some(Role::Replica));
assert!(qr.query_parser_enabled()); assert!(qr.query_parser_enabled());
let query = simple_query("SET SERVER ROLE TO 'default'"); let query = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(query) != None); assert!(qr.try_execute_command(query) != None);
assert!(qr.query_parser_enabled()); assert!(!qr.query_parser_enabled());
} }
#[test] #[test]
@@ -629,7 +669,8 @@ mod test {
user: crate::config::User::default(), user: crate::config::User::default(),
default_role: Some(Role::Replica), default_role: Some(Role::Replica),
query_parser_enabled: true, query_parser_enabled: true,
primary_reads_enabled: false, primary_reads_enabled: true,
sharding_key_regex: None,
sharding_function: ShardingFunction::PgBigintHash, sharding_function: ShardingFunction::PgBigintHash,
}; };
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
@@ -643,8 +684,8 @@ mod test {
assert_eq!(qr.active_role, None); assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None); assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false); assert_eq!(qr.query_parser_enabled, true);
assert_eq!(qr.primary_reads_enabled, false); assert_eq!(qr.primary_reads_enabled, true);
let q1 = simple_query("SET SERVER ROLE TO 'primary'"); let q1 = simple_query("SET SERVER ROLE TO 'primary'");
assert!(qr.try_execute_command(q1) != None); assert!(qr.try_execute_command(q1) != None);