Add DB activity based routing (#864)

This commit is contained in:
Nadav Shatz
2024-12-22 13:23:57 +02:00
committed by GitHub
parent b37d105184
commit 3202f5685b
9 changed files with 757 additions and 38 deletions

View File

@@ -2,6 +2,7 @@
/// or implied query characteristics.
use bytes::{Buf, BytesMut};
use log::{debug, error};
use mini_moka::sync::Cache;
use once_cell::sync::OnceCell;
use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
@@ -11,6 +12,7 @@ use sqlparser::ast::{
};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::sync::OnceLock;
use crate::config::Role;
use crate::errors::Error;
@@ -21,6 +23,7 @@ use crate::sharding::Sharder;
use std::collections::BTreeSet;
use std::io::Cursor;
use std::time::Duration;
use std::{cmp, mem};
/// Regexes used to parse custom commands.
@@ -66,6 +69,18 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
#[derive(Debug, Clone, PartialEq)]
enum DatabaseActivityState {
Active,
Initializing,
}
// A moka cache for the databases
// the key is the database name and the value is the database activity state
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
// A moka cache for the tables, the key is the db_table.
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
/// The query router.
pub struct QueryRouter {
/// Which shard we should be talking to right now.
@@ -87,6 +102,12 @@ pub struct QueryRouter {
placeholders: Vec<i16>,
}
struct ExtractedExprsAndTables<'a> {
exprs: Vec<Expr>,
table_names: Vec<Vec<Ident>>,
assignments_opt: Option<&'a Vec<Assignment>>,
}
impl QueryRouter {
/// One-time initialization of regexes
/// that parse our custom SQL protocol.
@@ -398,6 +419,41 @@ impl QueryRouter {
}
}
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
DATABASE_ACTIVITY_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
.build()
})
.clone()
}
/// Check database activity state and reset it if necessary
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
let cache = self.database_activity_cache();
// Exists in cache
if cache.contains_key(db) {
return cache.get(db).unwrap();
}
// Not in cache
debug!("Adding database to cache: {}", db);
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
// Set a timer to update the cache
let db = db.clone();
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
cache.insert(db, DatabaseActivityState::Active);
});
DatabaseActivityState::Initializing
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting {
@@ -412,9 +468,23 @@ impl QueryRouter {
return Err(Error::QueryRouterParserError("empty query".into()));
}
let mut primary_set_based_on_activity = false;
let mut visited_write_statement = false;
let mut prev_inferred_shard = None;
if self.pool_settings.db_activity_based_routing {
let db = self.pool_settings.db.clone();
let state = self.database_activity_state(&db);
debug!("Database activity state: {:?}", state);
if let DatabaseActivityState::Initializing = state {
debug!("Database is initializing, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
}
}
for q in ast {
match q {
// All transactions go to the primary, probably a write.
@@ -425,6 +495,22 @@ impl QueryRouter {
// Likely a read-only query
Query(query) => {
if primary_set_based_on_activity {
// If we already set the role based on activity, we don't need to do it again
continue;
}
if self.pool_settings.db_activity_based_routing {
// Check if the tables in the query have been written to recently
if self.query_handles_tables_in_mutation_cache(query) {
debug!("Query handles tables in mutation cache, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
continue;
}
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: if we have multiple queries in the same message,
@@ -455,6 +541,13 @@ impl QueryRouter {
// Likely a write
_ => {
debug!("Write statement found, going to primary");
if self.pool_settings.db_activity_based_routing {
// add all of the query tables to the mutation cache
self.update_mutation_cache_on_write(q);
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: similar to the above, if we have multiple queries in the
@@ -497,11 +590,40 @@ impl QueryRouter {
Ok(())
}
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
let mut exprs = Vec::new();
fn table_mutations_cache(&self) -> Cache<String, bool> {
TABLE_MUTATIONS_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_live(Duration::from_millis(
self.pool_settings.table_mutation_cache_ms_ttl,
))
.build()
})
.clone()
}
// Collect all table names from the query.
fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
let table_mutations_cache = self.table_mutations_cache();
debug!("Checking if query handles tables in mutation cache");
debug!("Table mutations cache: {:?}", table_mutations_cache);
for tables in self.table_names(query) {
for table in tables {
if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
return true;
}
}
}
false
}
fn extract_exprs_and_table_names<'a>(
&'a self,
q: &'a Statement,
) -> Option<ExtractedExprsAndTables<'a>> {
let mut exprs = Vec::new();
let mut table_names = Vec::new();
let mut assignments_opt = None;
match q {
Insert(i) => {
@@ -520,7 +642,7 @@ impl QueryRouter {
exprs.push(expr.clone());
}
// Multi tables delete are not supported in postgres.
// Multi-tables delete are not supported in postgres.
assert!(d.tables.is_empty());
if let Some(using_tbl_with_join) = &d.using {
@@ -544,14 +666,55 @@ impl QueryRouter {
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
}
Self::process_selection(selection, &mut exprs);
self.assignment_parser(assignments)?;
}
_ => {
return Ok(None);
assignments_opt = Some(assignments);
}
_ => return None,
};
Ok(self.infer_shard_from_exprs(exprs, table_names))
Some(ExtractedExprsAndTables {
exprs,
table_names,
assignments_opt,
})
}
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
let exprs = extracted.exprs;
let table_names = extracted.table_names;
let assignments_opt = extracted.assignments_opt;
if let Some(assignments) = assignments_opt {
self.assignment_parser(assignments)?;
}
Ok(self.infer_shard_from_exprs(exprs, table_names))
} else {
Ok(None)
}
}
fn update_mutation_cache_on_write(&self, q: &Statement) {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
debug!("Updating mutation cache on write");
let table_names = extracted.table_names;
debug!("Table names in mutation query: {:?}", table_names);
let table_mutations_cache = self.table_mutations_cache();
for tables in table_names {
for table in tables {
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
}
}
}
}
// combines the database name and table name into a single string
// to be used as the key in the table mutation cache
// e.g. "mydb.mytable"
fn table_mutation_cache_key(&self, table: Ident) -> String {
format!("{}.{}", self.pool_settings.db, table.value)
}
fn process_query(
@@ -939,6 +1102,18 @@ impl QueryRouter {
self.infer_shard_from_exprs(exprs, table_names)
}
/// get table names from query
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
let mut exprs = Vec::new();
let mut table_names = Vec::new();
Self::process_query(query, &mut exprs, &mut table_names, &None);
debug!("Table names in query: {:?}", table_names);
table_names
}
fn infer_shard_from_exprs(
&mut self,
exprs: Vec<Expr>,
@@ -1106,6 +1281,7 @@ mod test {
use crate::messages::simple_query;
use crate::sharding::ShardingFunction;
use bytes::BufMut;
use serial_test::serial;
#[test]
fn test_defaults() {
@@ -1461,6 +1637,10 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
let mut qr = QueryRouter::new();
@@ -1539,6 +1719,10 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
@@ -1954,4 +2138,150 @@ mod test {
assert_eq!(res, Ok(PluginOutput::Allow));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_initializing_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Initially, the database activity should be in the "Initializing" state
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
assert_eq!(state, DatabaseActivityState::Initializing);
// Check that the router chooses the primary role due to "Initializing" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_active_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
let db_name = qr.pool_settings.db.clone();
let cache = qr.database_activity_cache();
cache.insert(db_name.clone(), DatabaseActivityState::Active);
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Check that the router can choose a replica role when in "Active" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), None); // Default should allow replica due to active state
}
#[tokio::test]
#[serial]
async fn test_table_mutation_cache_on_write() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
let ast = qr.parse(&query).unwrap();
// Simulate the mutation query which should populate the mutation cache
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
let cache = qr.table_mutations_cache();
// Ensure the table mutation cache contains the table with recent write
assert!(cache.contains_key(&table_cache_key));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_multi_query() {
use super::*;
use crate::messages::simple_query;
use tokio::time::Duration;
QueryRouter::setup();
let mut qr = QueryRouter::new();
// Configure the pool settings for db_activity_based_routing
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.db = "test_db_activity_routing".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
// First query when database is initializing
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
// Should route to primary because database is initializing
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the initialization delay to pass
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.db_activity_init_delay * 2,
))
.await;
// Next query after database is active
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because database is active and no recent mutations
assert_eq!(qr.role(), None);
// Simulate a write query to update the mutation cache
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because it's a write operation
assert_eq!(qr.role(), Some(Role::Primary));
// Immediately run a read query on the same table
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because the table was recently mutated
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the mutation cache TTL to expire
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
))
.await;
// Run the read query again after cache expiration
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because mutation cache has expired
assert_eq!(qr.role(), None);
}
}