mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
7 Commits
dependabot
...
mostafa_te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae843a9797 | ||
|
|
6ffdf5f5b0 | ||
|
|
5b038813eb | ||
|
|
4fdf73c599 | ||
|
|
3349cecc18 | ||
|
|
f8e2fcd0ed | ||
|
|
3202f5685b |
2
.github/workflows/chart-lint-test.yaml
vendored
2
.github/workflows/chart-lint-test.yaml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
version: v3.8.1
|
||||
|
||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.1.0
|
||||
with:
|
||||
|
||||
52
CONFIG.md
52
CONFIG.md
@@ -298,6 +298,19 @@ Load balancing mode
|
||||
`random` selects the server at random
|
||||
`loc` selects the server with the least outstanding busy connections
|
||||
|
||||
### checkout_failure_limit
|
||||
```
|
||||
path: pools.<pool_name>.checkout_failure_limit
|
||||
default: 0 (disabled)
|
||||
```
|
||||
|
||||
`Maximum number of checkout failures a client is allowed before it
|
||||
gets disconnected. This is needed to prevent persistent client/server
|
||||
imbalance in high availability setups where multiple PgCat instances are placed
|
||||
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
in session mode
|
||||
`
|
||||
### default_role
|
||||
```
|
||||
path: pools.<pool_name>.default_role
|
||||
@@ -309,6 +322,45 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
|
||||
`replica` round-robin between replicas only without touching the primary,
|
||||
`primary` all queries go to the primary unless otherwise specified.
|
||||
|
||||
### db_activity_based_routing
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_based_routing
|
||||
default: false
|
||||
```
|
||||
|
||||
If enabled, PgCat will route queries to the primary if the queried table was recently written to.
|
||||
Only relevant when `query_parser_enabled` *and* `query_parser_read_write_splitting` is enabled.
|
||||
|
||||
##### Considerations:
|
||||
- *This feature is experimental and may not work as expected.*
|
||||
- This feature only works when the same PgCat instance is used for both reads and writes to the database.
|
||||
- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries.
|
||||
- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions.
|
||||
|
||||
### db_activity_based_ms_init_delay
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_based_ms_init_delay
|
||||
default: 100
|
||||
```
|
||||
|
||||
The delay in milliseconds before the first activity-based routing check is performed.
|
||||
|
||||
### db_activity_ttl
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_ttl
|
||||
default: 900
|
||||
```
|
||||
|
||||
The time in seconds after which a DB is considered inactive when no queries/updates are performed to it.
|
||||
|
||||
### table_mutation_cache_ms_ttl
|
||||
```
|
||||
path: pools.<pool_name>.table_mutation_cache_ms_ttl
|
||||
default: 50
|
||||
```
|
||||
|
||||
The time in milliseconds after a write to a table that all queries to that table will be routed to the primary.
|
||||
|
||||
### prepared_statements_cache_size
|
||||
```
|
||||
path: general.prepared_statements_cache_size
|
||||
|
||||
695
Cargo.lock
generated
695
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "1.2.0"
|
||||
version = "1.3.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -27,7 +27,7 @@ hmac = "0.12"
|
||||
sha2 = "0.10"
|
||||
base64 = "0.21"
|
||||
stringprep = "0.1"
|
||||
tokio-rustls = "0.26"
|
||||
tokio-rustls = "0.24"
|
||||
rustls-pemfile = "1"
|
||||
http-body-util = "0.1.2"
|
||||
hyper = { version = "1.4.1", features = ["full"] }
|
||||
@@ -55,6 +55,10 @@ tracing-subscriber = { version = "0.3.17", features = [
|
||||
"std",
|
||||
] }
|
||||
lru = "0.12.0"
|
||||
mini-moka = "0.10.3"
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
serial_test = "*"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.79.0-slim-bookworm AS builder
|
||||
FROM rust:1.81.0-slim-bookworm AS builder
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM cimg/rust:1.79.0
|
||||
FROM cimg/rust:1.81.0
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
RUN sudo apt-get update && \
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
apiVersion: v2
|
||||
name: pgcat
|
||||
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
description: A Helam chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
maintainers:
|
||||
- name: PostgresML
|
||||
email: team@postgresml.org
|
||||
appVersion: "1.2.0"
|
||||
appVersion: "1.3.0"
|
||||
version: 0.2.5
|
||||
|
||||
@@ -51,6 +51,10 @@ stringData:
|
||||
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
|
||||
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
|
||||
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
|
||||
db_activity_based_routing = {{ default false $pool.db_activity_based_routing }}
|
||||
db_activity_based_ms_init_delay = {{ default 100 $pool.db_activity_based_ms_init_delay }}
|
||||
db_activity_ttl = {{ default 900 $pool.db_activity_ttl }}
|
||||
table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }}
|
||||
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
|
||||
|
||||
{{- range $index, $user := $pool.users }}
|
||||
|
||||
@@ -298,6 +298,22 @@ configuration:
|
||||
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
|
||||
# query_parser_read_write_splitting: true
|
||||
|
||||
# ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated.
|
||||
# ## @param configuration.poolsPostgres.db_activity_based_routing
|
||||
# db_activity_based_routing: false
|
||||
|
||||
# ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation.
|
||||
# ## @param configuration.poolsPostgres.db_activity_based_ms_init_delay
|
||||
# db_activity_based_ms_init_delay: 100
|
||||
|
||||
# ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries.
|
||||
# ## @param configuration.poolsPostgres.db_activity_ttl
|
||||
# db_activity_ttl: 900
|
||||
|
||||
# ## Table mutation cache TTL. How long to keep track of table mutations.
|
||||
# ## @param configuration.poolsPostgres.table_mutation_cache_ttl
|
||||
# table_mutation_cache_ttl: 50
|
||||
|
||||
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# ## load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# ## queries. The primary can always be explicitly selected with our custom protocol.
|
||||
|
||||
@@ -859,6 +859,8 @@ where
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
|
||||
let mut checkout_failure_count: u64 = 0;
|
||||
|
||||
self.stats.register(self.stats.clone());
|
||||
|
||||
// Result returned by one of the plugins.
|
||||
@@ -1108,7 +1110,25 @@ where
|
||||
query_router.role(),
|
||||
err
|
||||
);
|
||||
|
||||
checkout_failure_count += 1;
|
||||
if let Some(limit) = pool.settings.checkout_failure_limit {
|
||||
if checkout_failure_count >= limit {
|
||||
error!(
|
||||
"Checkout failure limit reached ({} / {}) - disconnecting client",
|
||||
checkout_failure_count, limit
|
||||
);
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
&format!(
|
||||
"checkout failure limit reached ({} / {})",
|
||||
checkout_failure_count, limit
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
self.stats.disconnect();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -558,6 +558,14 @@ pub struct Pool {
|
||||
/// Close idle connections that have been opened for longer than this.
|
||||
pub idle_timeout: Option<u64>,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
/// Close server connections that have been opened for longer than this.
|
||||
/// Only applied to idle connections. If the connection is actively used for
|
||||
/// longer than this period, the pool will not interrupt it.
|
||||
@@ -589,6 +597,19 @@ pub struct Pool {
|
||||
#[serde(default = "Pool::default_prepared_statements_cache_size")]
|
||||
pub prepared_statements_cache_size: usize,
|
||||
|
||||
// Support for query routing based on database activity
|
||||
#[serde(default = "Pool::default_db_activity_based_routing")]
|
||||
pub db_activity_based_routing: bool,
|
||||
|
||||
#[serde(default = "Pool::default_db_activity_init_delay")]
|
||||
pub db_activity_init_delay: u64,
|
||||
|
||||
#[serde(default = "Pool::default_db_activity_ttl")]
|
||||
pub db_activity_ttl: u64,
|
||||
|
||||
#[serde(default = "Pool::default_table_mutation_cache_ms_ttl")]
|
||||
pub table_mutation_cache_ms_ttl: u64,
|
||||
|
||||
pub plugins: Option<Plugins>,
|
||||
pub shards: BTreeMap<String, Shard>,
|
||||
pub users: BTreeMap<String, User>,
|
||||
@@ -642,6 +663,25 @@ impl Pool {
|
||||
0
|
||||
}
|
||||
|
||||
pub fn default_db_activity_based_routing() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn default_db_activity_init_delay() -> u64 {
|
||||
// 100 milliseconds
|
||||
100
|
||||
}
|
||||
|
||||
pub fn default_db_activity_ttl() -> u64 {
|
||||
// 15 minutes
|
||||
15 * 60
|
||||
}
|
||||
|
||||
pub fn default_table_mutation_cache_ms_ttl() -> u64 {
|
||||
// 50 milliseconds
|
||||
50
|
||||
}
|
||||
|
||||
pub fn validate(&mut self) -> Result<(), Error> {
|
||||
match self.default_role.as_ref() {
|
||||
"any" => (),
|
||||
@@ -724,6 +764,23 @@ impl Pool {
|
||||
user.validate()?;
|
||||
}
|
||||
|
||||
if self.db_activity_based_routing {
|
||||
if self.db_activity_init_delay == 0 {
|
||||
error!("db_activity_init_delay must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if self.table_mutation_cache_ms_ttl == 0 {
|
||||
error!("table_mutation_cache_ms_ttl must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if self.db_activity_ttl == 0 {
|
||||
error!("db_activity_ttl must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -733,6 +790,7 @@ impl Default for Pool {
|
||||
Pool {
|
||||
pool_mode: Self::default_pool_mode(),
|
||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||
checkout_failure_limit: None,
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
query_parser_max_length: None,
|
||||
@@ -753,6 +811,10 @@ impl Default for Pool {
|
||||
cleanup_server_connections: true,
|
||||
log_client_parameter_status_changes: false,
|
||||
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
|
||||
db_activity_based_routing: Self::default_db_activity_based_routing(),
|
||||
db_activity_init_delay: Self::default_db_activity_init_delay(),
|
||||
db_activity_ttl: Self::default_db_activity_ttl(),
|
||||
table_mutation_cache_ms_ttl: Self::default_table_mutation_cache_ms_ttl(),
|
||||
plugins: None,
|
||||
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
|
||||
users: BTreeMap::default(),
|
||||
@@ -1245,6 +1307,17 @@ impl Config {
|
||||
None => self.general.idle_timeout,
|
||||
};
|
||||
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
|
||||
match pool_config.checkout_failure_limit {
|
||||
Some(checkout_failure_limit) => {
|
||||
info!(
|
||||
"[pool: {}] Checkout failure limit: {}",
|
||||
pool_name, checkout_failure_limit
|
||||
);
|
||||
}
|
||||
None => {
|
||||
info!("[pool: {}] Checkout failure limit: not set", pool_name);
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"[pool: {}] Sharding function: {}",
|
||||
pool_name,
|
||||
@@ -1289,6 +1362,22 @@ impl Config {
|
||||
"[pool: {}] Cleanup server connections: {}",
|
||||
pool_name, pool_config.cleanup_server_connections
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity based routing: {}",
|
||||
pool_name, pool_config.db_activity_based_routing
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity init delay: {}",
|
||||
pool_name, pool_config.db_activity_init_delay
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity TTL: {}",
|
||||
pool_name, pool_config.db_activity_ttl
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Table mutation cache TTL: {}",
|
||||
pool_name, pool_config.table_mutation_cache_ms_ttl
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Log client parameter status changes: {}",
|
||||
pool_name, pool_config.log_client_parameter_status_changes
|
||||
|
||||
@@ -3,7 +3,7 @@ use tracing_subscriber;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
pub fn init(args: &Args) {
|
||||
// Iniitalize a default filter, and then override the builtin default "warning" with our
|
||||
// Initialize a default filter, and then override the builtin default "warning" with our
|
||||
// commandline, (default: "info")
|
||||
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
|
||||
|
||||
|
||||
30
src/pool.rs
30
src/pool.rs
@@ -152,6 +152,14 @@ pub struct PoolSettings {
|
||||
/// Random or LeastOutstandingConnections.
|
||||
pub load_balancing_mode: LoadBalancingMode,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
// Number of shards.
|
||||
pub shards: usize,
|
||||
|
||||
@@ -174,6 +182,18 @@ pub struct PoolSettings {
|
||||
// Read from the primary as well or not.
|
||||
pub primary_reads_enabled: bool,
|
||||
|
||||
// Automatic primary/replica selection based on recent activity.
|
||||
pub db_activity_based_routing: bool,
|
||||
|
||||
// DB activity init delay
|
||||
pub db_activity_init_delay: u64,
|
||||
|
||||
// DB activity TTL
|
||||
pub db_activity_ttl: u64,
|
||||
|
||||
// Table mutation cache TTL
|
||||
pub table_mutation_cache_ms_ttl: u64,
|
||||
|
||||
// Sharding function.
|
||||
pub sharding_function: ShardingFunction,
|
||||
|
||||
@@ -215,6 +235,7 @@ impl Default for PoolSettings {
|
||||
PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 1,
|
||||
user: User::default(),
|
||||
db: String::default(),
|
||||
@@ -223,6 +244,10 @@ impl Default for PoolSettings {
|
||||
query_parser_max_length: None,
|
||||
query_parser_read_write_splitting: false,
|
||||
primary_reads_enabled: true,
|
||||
db_activity_based_routing: false,
|
||||
db_activity_init_delay: 100,
|
||||
db_activity_ttl: 15 * 60,
|
||||
table_mutation_cache_ms_ttl: 50,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
automatic_sharding_key: None,
|
||||
healthcheck_delay: General::default_healthcheck_delay(),
|
||||
@@ -521,6 +546,7 @@ impl ConnectionPool {
|
||||
None => pool_config.pool_mode,
|
||||
},
|
||||
load_balancing_mode: pool_config.load_balancing_mode,
|
||||
checkout_failure_limit: pool_config.checkout_failure_limit,
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
user: user.clone(),
|
||||
@@ -537,6 +563,10 @@ impl ConnectionPool {
|
||||
.query_parser_read_write_splitting,
|
||||
primary_reads_enabled: pool_config.primary_reads_enabled,
|
||||
sharding_function: pool_config.sharding_function,
|
||||
db_activity_based_routing: pool_config.db_activity_based_routing,
|
||||
db_activity_init_delay: pool_config.db_activity_init_delay,
|
||||
db_activity_ttl: pool_config.db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: pool_config.table_mutation_cache_ms_ttl,
|
||||
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
|
||||
healthcheck_delay: config.general.healthcheck_delay,
|
||||
healthcheck_timeout: config.general.healthcheck_timeout,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
/// or implied query characteristics.
|
||||
use bytes::{Buf, BytesMut};
|
||||
use log::{debug, error};
|
||||
use mini_moka::sync::Cache;
|
||||
use once_cell::sync::OnceCell;
|
||||
use regex::{Regex, RegexSet};
|
||||
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
|
||||
@@ -11,6 +12,7 @@ use sqlparser::ast::{
|
||||
};
|
||||
use sqlparser::dialect::PostgreSqlDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use crate::config::Role;
|
||||
use crate::errors::Error;
|
||||
@@ -21,6 +23,7 @@ use crate::sharding::Sharder;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::Cursor;
|
||||
use std::time::Duration;
|
||||
use std::{cmp, mem};
|
||||
|
||||
/// Regexes used to parse custom commands.
|
||||
@@ -66,6 +69,18 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
|
||||
// Get the value inside the custom command.
|
||||
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
enum DatabaseActivityState {
|
||||
Active,
|
||||
Initializing,
|
||||
}
|
||||
|
||||
// A moka cache for the databases
|
||||
// the key is the database name and the value is the database activity state
|
||||
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
|
||||
// A moka cache for the tables, the key is the db_table.
|
||||
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
|
||||
|
||||
/// The query router.
|
||||
pub struct QueryRouter {
|
||||
/// Which shard we should be talking to right now.
|
||||
@@ -87,6 +102,12 @@ pub struct QueryRouter {
|
||||
placeholders: Vec<i16>,
|
||||
}
|
||||
|
||||
struct ExtractedExprsAndTables<'a> {
|
||||
exprs: Vec<Expr>,
|
||||
table_names: Vec<Vec<Ident>>,
|
||||
assignments_opt: Option<&'a Vec<Assignment>>,
|
||||
}
|
||||
|
||||
impl QueryRouter {
|
||||
/// One-time initialization of regexes
|
||||
/// that parse our custom SQL protocol.
|
||||
@@ -398,6 +419,41 @@ impl QueryRouter {
|
||||
}
|
||||
}
|
||||
|
||||
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
|
||||
DATABASE_ACTIVITY_CACHE
|
||||
.get_or_init(|| {
|
||||
Cache::builder()
|
||||
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
|
||||
.build()
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Check database activity state and reset it if necessary
|
||||
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
|
||||
let cache = self.database_activity_cache();
|
||||
|
||||
// Exists in cache
|
||||
if cache.contains_key(db) {
|
||||
return cache.get(db).unwrap();
|
||||
}
|
||||
|
||||
// Not in cache
|
||||
debug!("Adding database to cache: {}", db);
|
||||
|
||||
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
|
||||
|
||||
// Set a timer to update the cache
|
||||
let db = db.clone();
|
||||
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
|
||||
cache.insert(db, DatabaseActivityState::Active);
|
||||
});
|
||||
|
||||
DatabaseActivityState::Initializing
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||
if !self.pool_settings.query_parser_read_write_splitting {
|
||||
@@ -412,9 +468,23 @@ impl QueryRouter {
|
||||
return Err(Error::QueryRouterParserError("empty query".into()));
|
||||
}
|
||||
|
||||
let mut primary_set_based_on_activity = false;
|
||||
let mut visited_write_statement = false;
|
||||
let mut prev_inferred_shard = None;
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
let db = self.pool_settings.db.clone();
|
||||
let state = self.database_activity_state(&db);
|
||||
debug!("Database activity state: {:?}", state);
|
||||
|
||||
if let DatabaseActivityState::Initializing = state {
|
||||
debug!("Database is initializing, going to primary");
|
||||
|
||||
self.active_role = Some(Role::Primary);
|
||||
primary_set_based_on_activity = true;
|
||||
}
|
||||
}
|
||||
|
||||
for q in ast {
|
||||
match q {
|
||||
// All transactions go to the primary, probably a write.
|
||||
@@ -425,6 +495,22 @@ impl QueryRouter {
|
||||
|
||||
// Likely a read-only query
|
||||
Query(query) => {
|
||||
if primary_set_based_on_activity {
|
||||
// If we already set the role based on activity, we don't need to do it again
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
// Check if the tables in the query have been written to recently
|
||||
if self.query_handles_tables_in_mutation_cache(query) {
|
||||
debug!("Query handles tables in mutation cache, going to primary");
|
||||
|
||||
self.active_role = Some(Role::Primary);
|
||||
primary_set_based_on_activity = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match &self.pool_settings.automatic_sharding_key {
|
||||
Some(_) => {
|
||||
// TODO: if we have multiple queries in the same message,
|
||||
@@ -455,6 +541,13 @@ impl QueryRouter {
|
||||
|
||||
// Likely a write
|
||||
_ => {
|
||||
debug!("Write statement found, going to primary");
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
// add all of the query tables to the mutation cache
|
||||
self.update_mutation_cache_on_write(q);
|
||||
}
|
||||
|
||||
match &self.pool_settings.automatic_sharding_key {
|
||||
Some(_) => {
|
||||
// TODO: similar to the above, if we have multiple queries in the
|
||||
@@ -497,11 +590,40 @@ impl QueryRouter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
|
||||
let mut exprs = Vec::new();
|
||||
fn table_mutations_cache(&self) -> Cache<String, bool> {
|
||||
TABLE_MUTATIONS_CACHE
|
||||
.get_or_init(|| {
|
||||
Cache::builder()
|
||||
.time_to_live(Duration::from_millis(
|
||||
self.pool_settings.table_mutation_cache_ms_ttl,
|
||||
))
|
||||
.build()
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
// Collect all table names from the query.
|
||||
fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
|
||||
let table_mutations_cache = self.table_mutations_cache();
|
||||
debug!("Checking if query handles tables in mutation cache");
|
||||
debug!("Table mutations cache: {:?}", table_mutations_cache);
|
||||
|
||||
for tables in self.table_names(query) {
|
||||
for table in tables {
|
||||
if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
fn extract_exprs_and_table_names<'a>(
|
||||
&'a self,
|
||||
q: &'a Statement,
|
||||
) -> Option<ExtractedExprsAndTables<'a>> {
|
||||
let mut exprs = Vec::new();
|
||||
let mut table_names = Vec::new();
|
||||
let mut assignments_opt = None;
|
||||
|
||||
match q {
|
||||
Insert(i) => {
|
||||
@@ -520,7 +642,7 @@ impl QueryRouter {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
|
||||
// Multi tables delete are not supported in postgres.
|
||||
// Multi-tables delete are not supported in postgres.
|
||||
assert!(d.tables.is_empty());
|
||||
|
||||
if let Some(using_tbl_with_join) = &d.using {
|
||||
@@ -544,14 +666,55 @@ impl QueryRouter {
|
||||
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
|
||||
}
|
||||
Self::process_selection(selection, &mut exprs);
|
||||
self.assignment_parser(assignments)?;
|
||||
}
|
||||
_ => {
|
||||
return Ok(None);
|
||||
|
||||
assignments_opt = Some(assignments);
|
||||
}
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
Ok(self.infer_shard_from_exprs(exprs, table_names))
|
||||
Some(ExtractedExprsAndTables {
|
||||
exprs,
|
||||
table_names,
|
||||
assignments_opt,
|
||||
})
|
||||
}
|
||||
|
||||
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
|
||||
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
|
||||
let exprs = extracted.exprs;
|
||||
let table_names = extracted.table_names;
|
||||
let assignments_opt = extracted.assignments_opt;
|
||||
|
||||
if let Some(assignments) = assignments_opt {
|
||||
self.assignment_parser(assignments)?;
|
||||
}
|
||||
|
||||
Ok(self.infer_shard_from_exprs(exprs, table_names))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn update_mutation_cache_on_write(&self, q: &Statement) {
|
||||
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
|
||||
debug!("Updating mutation cache on write");
|
||||
|
||||
let table_names = extracted.table_names;
|
||||
debug!("Table names in mutation query: {:?}", table_names);
|
||||
let table_mutations_cache = self.table_mutations_cache();
|
||||
for tables in table_names {
|
||||
for table in tables {
|
||||
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// combines the database name and table name into a single string
|
||||
// to be used as the key in the table mutation cache
|
||||
// e.g. "mydb.mytable"
|
||||
fn table_mutation_cache_key(&self, table: Ident) -> String {
|
||||
format!("{}.{}", self.pool_settings.db, table.value)
|
||||
}
|
||||
|
||||
fn process_query(
|
||||
@@ -939,6 +1102,18 @@ impl QueryRouter {
|
||||
self.infer_shard_from_exprs(exprs, table_names)
|
||||
}
|
||||
|
||||
/// get table names from query
|
||||
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
|
||||
let mut exprs = Vec::new();
|
||||
|
||||
let mut table_names = Vec::new();
|
||||
Self::process_query(query, &mut exprs, &mut table_names, &None);
|
||||
|
||||
debug!("Table names in query: {:?}", table_names);
|
||||
|
||||
table_names
|
||||
}
|
||||
|
||||
fn infer_shard_from_exprs(
|
||||
&mut self,
|
||||
exprs: Vec<Expr>,
|
||||
@@ -1106,6 +1281,7 @@ mod test {
|
||||
use crate::messages::simple_query;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use bytes::BufMut;
|
||||
use serial_test::serial;
|
||||
|
||||
#[test]
|
||||
fn test_defaults() {
|
||||
@@ -1441,6 +1617,7 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 2,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
@@ -1461,6 +1638,10 @@ mod test {
|
||||
auth_query_password: None,
|
||||
auth_query_user: None,
|
||||
db: "test".to_string(),
|
||||
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
|
||||
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
|
||||
db_activity_ttl: PoolSettings::default().db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
|
||||
plugins: None,
|
||||
};
|
||||
let mut qr = QueryRouter::new();
|
||||
@@ -1519,6 +1700,7 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: Some(10),
|
||||
shards: 5,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
@@ -1539,6 +1721,10 @@ mod test {
|
||||
auth_query_password: None,
|
||||
auth_query_user: None,
|
||||
db: "test".to_string(),
|
||||
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
|
||||
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
|
||||
db_activity_ttl: PoolSettings::default().db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
|
||||
plugins: None,
|
||||
};
|
||||
|
||||
@@ -1954,4 +2140,150 @@ mod test {
|
||||
|
||||
assert_eq!(res, Ok(PluginOutput::Allow));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_initializing_state() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
let query = simple_query("SELECT * FROM some_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Initially, the database activity should be in the "Initializing" state
|
||||
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
|
||||
assert_eq!(state, DatabaseActivityState::Initializing);
|
||||
|
||||
// Check that the router chooses the primary role due to "Initializing" state
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_active_state() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
let db_name = qr.pool_settings.db.clone();
|
||||
let cache = qr.database_activity_cache();
|
||||
cache.insert(db_name.clone(), DatabaseActivityState::Active);
|
||||
|
||||
let query = simple_query("SELECT * FROM some_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Check that the router can choose a replica role when in "Active" state
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), None); // Default should allow replica due to active state
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_table_mutation_cache_on_write() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Simulate the mutation query which should populate the mutation cache
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
|
||||
let cache = qr.table_mutations_cache();
|
||||
|
||||
// Ensure the table mutation cache contains the table with recent write
|
||||
assert!(cache.contains_key(&table_cache_key));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_multi_query() {
|
||||
use super::*;
|
||||
use crate::messages::simple_query;
|
||||
use tokio::time::Duration;
|
||||
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
|
||||
// Configure the pool settings for db_activity_based_routing
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.db = "test_db_activity_routing".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
// First query when database is initializing
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because database is initializing
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Wait for the initialization delay to pass
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
qr.pool_settings.db_activity_init_delay * 2,
|
||||
))
|
||||
.await;
|
||||
|
||||
// Next query after database is active
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to replica because database is active and no recent mutations
|
||||
assert_eq!(qr.role(), None);
|
||||
|
||||
// Simulate a write query to update the mutation cache
|
||||
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because it's a write operation
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Immediately run a read query on the same table
|
||||
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because the table was recently mutated
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Wait for the mutation cache TTL to expire
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
|
||||
))
|
||||
.await;
|
||||
|
||||
// Run the read query again after cache expiration
|
||||
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to replica because mutation cache has expired
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:bullseye
|
||||
FROM rust:1.81.0-slim-bookworm
|
||||
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
|
||||
@@ -188,6 +188,102 @@ describe "Miscellaneous" do
|
||||
end
|
||||
end
|
||||
|
||||
describe "Checkout failure limit" do
|
||||
context "when no checkout failure limit is set" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set high" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set low" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "disconnects client after reaching limit" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
checkout_failure_count = 0
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(1);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
checkout_failure_count += 1
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::ConnectionBad
|
||||
expect(checkout_failure_count).to eq(2)
|
||||
expect(conn.status).to eq(PG::CONNECTION_BAD)
|
||||
break
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
puts processes.pgcat.logs
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Server version reporting" do
|
||||
it "reports correct version for normal and admin databases" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
Reference in New Issue
Block a user