mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Automatic sharding: part one of many (#194)
Starting automatic sharding
This commit is contained in:
@@ -83,6 +83,9 @@ primary_reads_enabled = true
|
||||
#
|
||||
sharding_function = "pg_bigint_hash"
|
||||
|
||||
# Automatically parse this from queries and route queries to the right shard!
|
||||
automatic_sharding_key = "id"
|
||||
|
||||
# Credentials for users that may connect to this cluster
|
||||
[pools.sharded_db.users.0]
|
||||
username = "sharding_user"
|
||||
|
||||
@@ -672,7 +672,7 @@ where
|
||||
// Normal query, not a custom command.
|
||||
None => {
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer_role(message.clone());
|
||||
query_router.infer(message.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -267,6 +267,10 @@ pub struct Pool {
|
||||
pub connect_timeout: Option<u64>,
|
||||
|
||||
pub sharding_function: ShardingFunction,
|
||||
|
||||
#[serde(default = "Pool::default_automatic_sharding_key")]
|
||||
pub automatic_sharding_key: Option<String>,
|
||||
|
||||
pub shards: BTreeMap<String, Shard>,
|
||||
pub users: BTreeMap<String, User>,
|
||||
}
|
||||
@@ -276,6 +280,10 @@ impl Pool {
|
||||
PoolMode::Transaction
|
||||
}
|
||||
|
||||
pub fn default_automatic_sharding_key() -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), Error> {
|
||||
match self.default_role.as_ref() {
|
||||
"any" => (),
|
||||
@@ -318,6 +326,7 @@ impl Default for Pool {
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: false,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
automatic_sharding_key: None,
|
||||
connect_timeout: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,6 +79,9 @@ pub struct PoolSettings {
|
||||
|
||||
// Sharding function.
|
||||
pub sharding_function: ShardingFunction,
|
||||
|
||||
// Sharding key
|
||||
pub automatic_sharding_key: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for PoolSettings {
|
||||
@@ -91,6 +94,7 @@ impl Default for PoolSettings {
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
automatic_sharding_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -254,6 +258,7 @@ impl ConnectionPool {
|
||||
query_parser_enabled: pool_config.query_parser_enabled.clone(),
|
||||
primary_reads_enabled: pool_config.primary_reads_enabled,
|
||||
sharding_function: pool_config.sharding_function,
|
||||
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use log::{debug, error};
|
||||
use once_cell::sync::OnceCell;
|
||||
use regex::{Regex, RegexSet};
|
||||
use sqlparser::ast::Statement::{Query, StartTransaction};
|
||||
use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Value};
|
||||
use sqlparser::dialect::PostgreSqlDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
|
||||
@@ -12,6 +13,8 @@ use crate::config::Role;
|
||||
use crate::pool::PoolSettings;
|
||||
use crate::sharding::Sharder;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
/// Regexes used to parse custom commands.
|
||||
const CUSTOM_SQL_REGEXES: [&str; 7] = [
|
||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||
@@ -256,7 +259,7 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool {
|
||||
pub fn infer(&mut self, mut buf: BytesMut) -> bool {
|
||||
debug!("Inferring role");
|
||||
|
||||
let code = buf.get_u8() as char;
|
||||
@@ -324,7 +327,21 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
// Likely a read-only query
|
||||
Query { .. } => {
|
||||
Query(query) => {
|
||||
match &self.pool_settings.automatic_sharding_key {
|
||||
Some(_) => {
|
||||
// TODO: if we have multiple queries in the same message,
|
||||
// we can either split them and execute them individually
|
||||
// or discard shard selection. If they point to the same shard though,
|
||||
// we can let them through as-is.
|
||||
// This is basically building a database now :)
|
||||
self.active_shard = self.infer_shard(query);
|
||||
debug!("Automatically using shard: {:?}", self.active_shard);
|
||||
}
|
||||
|
||||
None => (),
|
||||
};
|
||||
|
||||
self.active_role = match self.primary_reads_enabled() {
|
||||
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
|
||||
true => None, // Any server role is fine in this case.
|
||||
@@ -342,6 +359,118 @@ impl QueryRouter {
|
||||
true
|
||||
}
|
||||
|
||||
/// A `selection` is the `WHERE` clause. This parses
|
||||
/// the clause and extracts the sharding key, if present.
|
||||
fn selection_parser(&self, expr: &Expr) -> Vec<i64> {
|
||||
let mut result = Vec::new();
|
||||
let mut found = false;
|
||||
|
||||
match expr {
|
||||
// This parses `sharding_key = 5`. But it's technically
|
||||
// legal to write `5 = sharding_key`. I don't judge the people
|
||||
// who do that, but I think ORMs will still use the first variant,
|
||||
// so we can leave the second as a TODO.
|
||||
Expr::BinaryOp { left, op, right } => {
|
||||
match &**left {
|
||||
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&left)),
|
||||
Expr::Identifier(ident) => {
|
||||
found = ident.value
|
||||
== *self.pool_settings.automatic_sharding_key.as_ref().unwrap();
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
|
||||
match op {
|
||||
BinaryOperator::Eq => (),
|
||||
BinaryOperator::Or => (),
|
||||
BinaryOperator::And => (),
|
||||
_ => {
|
||||
// TODO: support other operators than equality.
|
||||
debug!("Unsupported operation: {:?}", op);
|
||||
return Vec::new();
|
||||
}
|
||||
};
|
||||
|
||||
match &**right {
|
||||
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&right)),
|
||||
Expr::Value(Value::Number(value, ..)) => {
|
||||
if found {
|
||||
match value.parse::<i64>() {
|
||||
Ok(value) => result.push(value),
|
||||
Err(_) => {
|
||||
debug!("Sharding key was not an integer: {}", value);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
_ => (),
|
||||
};
|
||||
|
||||
debug!("Sharding keys found: {:?}", result);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Try to figure out which shard the query should go to.
|
||||
fn infer_shard(&self, query: &sqlparser::ast::Query) -> Option<usize> {
|
||||
let mut shards = BTreeSet::new();
|
||||
|
||||
match &*query.body {
|
||||
SetExpr::Query(query) => {
|
||||
match self.infer_shard(&*query) {
|
||||
Some(shard) => {
|
||||
shards.insert(shard);
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
}
|
||||
|
||||
SetExpr::Select(select) => {
|
||||
match &select.selection {
|
||||
Some(selection) => {
|
||||
let sharding_keys = self.selection_parser(&selection);
|
||||
|
||||
// TODO: Add support for prepared statements here.
|
||||
// This should just give us the position of the value in the `B` message.
|
||||
|
||||
let sharder = Sharder::new(
|
||||
self.pool_settings.shards,
|
||||
self.pool_settings.sharding_function,
|
||||
);
|
||||
|
||||
for value in sharding_keys {
|
||||
let shard = sharder.shard(value);
|
||||
shards.insert(shard);
|
||||
}
|
||||
}
|
||||
|
||||
None => (),
|
||||
};
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
|
||||
match shards.len() {
|
||||
// Didn't find a sharding key, you're on your own.
|
||||
0 => {
|
||||
debug!("No sharding keys found");
|
||||
None
|
||||
}
|
||||
|
||||
1 => Some(shards.into_iter().last().unwrap()),
|
||||
|
||||
// TODO: support querying multiple shards (some day...)
|
||||
_ => {
|
||||
debug!("More than one sharding key found");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current desired server role we should be talking to.
|
||||
pub fn role(&self) -> Option<Role> {
|
||||
self.active_role
|
||||
@@ -392,7 +521,7 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_replica() {
|
||||
fn test_infer_replica() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
|
||||
@@ -410,13 +539,13 @@ mod test {
|
||||
|
||||
for query in queries {
|
||||
// It's a recognized query
|
||||
assert!(qr.infer_role(query));
|
||||
assert!(qr.infer(query));
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_primary() {
|
||||
fn test_infer_primary() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
|
||||
@@ -429,24 +558,24 @@ mod test {
|
||||
|
||||
for query in queries {
|
||||
// It's a recognized query
|
||||
assert!(qr.infer_role(query));
|
||||
assert!(qr.infer(query));
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_primary_reads_enabled() {
|
||||
fn test_infer_primary_reads_enabled() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);
|
||||
|
||||
assert!(qr.infer_role(query));
|
||||
assert!(qr.infer(query));
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_parse_prepared() {
|
||||
fn test_infer_parse_prepared() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
|
||||
@@ -461,7 +590,7 @@ mod test {
|
||||
res.put(prepared_stmt);
|
||||
res.put_i16(0);
|
||||
|
||||
assert!(qr.infer_role(res));
|
||||
assert!(qr.infer(res));
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
}
|
||||
|
||||
@@ -625,11 +754,11 @@ mod test {
|
||||
assert_eq!(qr.role(), None);
|
||||
|
||||
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
||||
assert_eq!(qr.infer_role(query), true);
|
||||
assert_eq!(qr.infer(query), true);
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
assert_eq!(qr.infer_role(query), true);
|
||||
assert_eq!(qr.infer(query), true);
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
|
||||
assert!(qr.query_parser_enabled());
|
||||
@@ -644,12 +773,13 @@ mod test {
|
||||
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
shards: 0,
|
||||
shards: 2,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
query_parser_enabled: true,
|
||||
primary_reads_enabled: false,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
automatic_sharding_key: Some(String::from("id")),
|
||||
};
|
||||
let mut qr = QueryRouter::new();
|
||||
assert_eq!(qr.active_role, None);
|
||||
@@ -672,6 +802,11 @@ mod test {
|
||||
let q2 = simple_query("SET SERVER ROLE TO 'default'");
|
||||
assert!(qr.try_execute_command(q2) != None);
|
||||
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);
|
||||
|
||||
// Here we go :)
|
||||
let q3 = simple_query("SELECT * FROM test WHERE id = 5 AND values IN (1, 2, 3)");
|
||||
assert!(qr.infer(q3));
|
||||
assert_eq!(qr.shard(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -679,13 +814,13 @@ mod test {
|
||||
QueryRouter::setup();
|
||||
|
||||
let mut qr = QueryRouter::new();
|
||||
assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;")));
|
||||
assert!(qr.infer(simple_query("BEGIN; SELECT 1; COMMIT;")));
|
||||
assert_eq!(qr.role(), Role::Primary);
|
||||
|
||||
assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;")));
|
||||
assert!(qr.infer(simple_query("SELECT 1; SELECT 2;")));
|
||||
assert_eq!(qr.role(), Role::Replica);
|
||||
|
||||
assert!(qr.infer_role(simple_query(
|
||||
assert!(qr.infer(simple_query(
|
||||
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
|
||||
)));
|
||||
assert_eq!(qr.role(), Role::Primary);
|
||||
|
||||
Reference in New Issue
Block a user