Re-enable query parser and parse multiple statements (#191)

* Re-enable query parser and parse multiple statements

* no diff
This commit is contained in:
Lev Kokotov
2022-10-23 16:59:51 -07:00
committed by GitHub
parent 19f635881a
commit dea952e4ca
3 changed files with 78 additions and 42 deletions

4
Cargo.lock generated
View File

@@ -875,9 +875,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlparser" name = "sqlparser"
version = "0.23.0" version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0beb13adabbdda01b63d595f38c8bfd19a361e697fd94ce0098a634077bc5b25" checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb"
dependencies = [ dependencies = [
"log", "log",
] ]

View File

@@ -20,7 +20,7 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
sqlparser = "0.23.0" sqlparser = "0.26.0"
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
env_logger = "0.9" env_logger = "0.9"

View File

@@ -50,10 +50,10 @@ pub struct QueryRouter {
active_role: Option<Role>, active_role: Option<Role>,
/// Should we try to parse queries to route them to replicas or primary automatically /// Should we try to parse queries to route them to replicas or primary automatically
query_parser_enabled: bool, query_parser_enabled: Option<bool>,
/// Include the primary into the replica pool for reads. /// Include the primary into the replica pool for reads.
primary_reads_enabled: bool, primary_reads_enabled: Option<bool>,
/// Pool configuration. /// Pool configuration.
pool_settings: PoolSettings, pool_settings: PoolSettings,
@@ -95,8 +95,8 @@ impl QueryRouter {
QueryRouter { QueryRouter {
active_shard: None, active_shard: None,
active_role: None, active_role: None,
query_parser_enabled: false, query_parser_enabled: None,
primary_reads_enabled: false, primary_reads_enabled: None,
pool_settings: PoolSettings::default(), pool_settings: PoolSettings::default(),
} }
} }
@@ -172,7 +172,7 @@ impl QueryRouter {
Some(Role::Primary) => Role::Primary.to_string(), Some(Role::Primary) => Role::Primary.to_string(),
Some(Role::Replica) => Role::Replica.to_string(), Some(Role::Replica) => Role::Replica.to_string(),
None => { None => {
if self.query_parser_enabled { if self.query_parser_enabled() {
String::from("auto") String::from("auto")
} else { } else {
String::from("any") String::from("any")
@@ -180,7 +180,7 @@ impl QueryRouter {
} }
}, },
Command::ShowPrimaryReads => match self.primary_reads_enabled { Command::ShowPrimaryReads => match self.primary_reads_enabled() {
true => String::from("on"), true => String::from("on"),
false => String::from("off"), false => String::from("off"),
}, },
@@ -207,28 +207,28 @@ impl QueryRouter {
Command::SetServerRole => { Command::SetServerRole => {
self.active_role = match value.to_ascii_lowercase().as_ref() { self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => { "primary" => {
self.query_parser_enabled = false; self.query_parser_enabled = Some(false);
Some(Role::Primary) Some(Role::Primary)
} }
"replica" => { "replica" => {
self.query_parser_enabled = false; self.query_parser_enabled = Some(false);
Some(Role::Replica) Some(Role::Replica)
} }
"any" => { "any" => {
self.query_parser_enabled = false; self.query_parser_enabled = Some(false);
None None
} }
"auto" => { "auto" => {
self.query_parser_enabled = true; self.query_parser_enabled = Some(true);
None None
} }
"default" => { "default" => {
self.active_role = self.pool_settings.default_role; self.active_role = self.pool_settings.default_role;
self.query_parser_enabled = self.query_parser_enabled; self.query_parser_enabled = None;
self.active_role self.active_role
} }
@@ -239,13 +239,13 @@ impl QueryRouter {
Command::SetPrimaryReads => { Command::SetPrimaryReads => {
if value == "on" { if value == "on" {
debug!("Setting primary reads to on"); debug!("Setting primary reads to on");
self.primary_reads_enabled = true; self.primary_reads_enabled = Some(true);
} else if value == "off" { } else if value == "off" {
debug!("Setting primary reads to off"); debug!("Setting primary reads to off");
self.primary_reads_enabled = false; self.primary_reads_enabled = Some(false);
} else if value == "default" { } else if value == "default" {
debug!("Setting primary reads to default"); debug!("Setting primary reads to default");
self.primary_reads_enabled = self.pool_settings.primary_reads_enabled; self.primary_reads_enabled = None;
} }
} }
@@ -300,34 +300,44 @@ impl QueryRouter {
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast, Ok(ast) => ast,
Err(err) => { Err(err) => {
debug!("{}", err.to_string()); // SELECT ... FOR UPDATE won't get parsed correctly.
error!("{}: {}", err, query);
self.active_role = Some(Role::Primary);
return false; return false;
} }
}; };
debug!("AST: {:?}", ast);
if ast.len() == 0 { if ast.len() == 0 {
// That's weird, no idea, let's go to primary
self.active_role = Some(Role::Primary);
return false; return false;
} }
match ast[0] { for q in &ast {
// All transactions go to the primary, probably a write. match q {
StartTransaction { .. } => { // All transactions go to the primary, probably a write.
self.active_role = Some(Role::Primary); StartTransaction { .. } => {
} self.active_role = Some(Role::Primary);
break;
// Likely a read-only query
Query { .. } => {
self.active_role = match self.primary_reads_enabled {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
} }
}
// Likely a write // Likely a read-only query
_ => { Query { .. } => {
self.active_role = Some(Role::Primary); self.active_role = match self.primary_reads_enabled() {
} false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
}; true => None, // Any server role is fine in this case.
}
}
// Likely a write
_ => {
self.active_role = Some(Role::Primary);
break;
}
};
}
true true
} }
@@ -350,9 +360,18 @@ impl QueryRouter {
} }
/// Should we attempt to parse queries? /// Should we attempt to parse queries?
#[allow(dead_code)]
pub fn query_parser_enabled(&self) -> bool { pub fn query_parser_enabled(&self) -> bool {
self.query_parser_enabled match self.query_parser_enabled {
None => self.pool_settings.query_parser_enabled,
Some(value) => value,
}
}
pub fn primary_reads_enabled(&self) -> bool {
match self.primary_reads_enabled {
None => self.pool_settings.primary_reads_enabled,
Some(value) => value,
}
} }
} }
@@ -616,7 +635,7 @@ mod test {
assert!(qr.query_parser_enabled()); assert!(qr.query_parser_enabled());
let query = simple_query("SET SERVER ROLE TO 'default'"); let query = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(query) != None); assert!(qr.try_execute_command(query) != None);
assert!(qr.query_parser_enabled()); assert!(!qr.query_parser_enabled());
} }
#[test] #[test]
@@ -635,16 +654,16 @@ mod test {
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
assert_eq!(qr.active_role, None); assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None); assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false); assert_eq!(qr.query_parser_enabled, None);
assert_eq!(qr.primary_reads_enabled, false); assert_eq!(qr.primary_reads_enabled, None);
// Internal state must not be changed due to this, only defaults // Internal state must not be changed due to this, only defaults
qr.update_pool_settings(pool_settings.clone()); qr.update_pool_settings(pool_settings.clone());
assert_eq!(qr.active_role, None); assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None); assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false); assert_eq!(qr.query_parser_enabled(), true);
assert_eq!(qr.primary_reads_enabled, false); assert_eq!(qr.primary_reads_enabled(), false);
let q1 = simple_query("SET SERVER ROLE TO 'primary'"); let q1 = simple_query("SET SERVER ROLE TO 'primary'");
assert!(qr.try_execute_command(q1) != None); assert!(qr.try_execute_command(q1) != None);
@@ -654,4 +673,21 @@ mod test {
assert!(qr.try_execute_command(q2) != None); assert!(qr.try_execute_command(q2) != None);
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role); assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);
} }
#[test]
fn test_parse_multiple_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;")));
assert_eq!(qr.role(), Role::Primary);
assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;")));
assert_eq!(qr.role(), Role::Replica);
assert!(qr.infer_role(simple_query(
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
)));
assert_eq!(qr.role(), Role::Primary);
}
} }