mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 02:16:30 +00:00
Compare commits
16 Commits
levkk-more
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb5eb58ae8 | ||
|
|
d63be9b93a | ||
|
|
100778670c | ||
|
|
37e3349c24 | ||
|
|
7f57a89d75 | ||
|
|
0898461c01 | ||
|
|
52b1b43850 | ||
|
|
0907f1b77f | ||
|
|
73260690b0 | ||
|
|
5056cbe8ed | ||
|
|
571b02e178 | ||
|
|
159eb89bf0 | ||
|
|
389993bf3e | ||
|
|
ba5243b6dd | ||
|
|
128ef72911 | ||
|
|
811885f464 |
48
Cargo.lock
generated
48
Cargo.lock
generated
@@ -250,6 +250,12 @@ dependencies = [
|
|||||||
"subtle",
|
"subtle",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "either"
|
||||||
|
version = "1.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "enum-as-inner"
|
name = "enum-as-inner"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
@@ -308,6 +314,12 @@ version = "0.2.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
|
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fallible-iterator"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fnv"
|
name = "fnv"
|
||||||
version = "1.0.7"
|
version = "1.0.7"
|
||||||
@@ -658,6 +670,15 @@ dependencies = [
|
|||||||
"windows-sys",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.10.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.5"
|
version = "1.0.5"
|
||||||
@@ -882,7 +903,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha1"
|
version = "1.0.2-alpha3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -893,10 +914,11 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"exitcode",
|
"exitcode",
|
||||||
"fallible-iterator",
|
"fallible-iterator 0.3.0",
|
||||||
"futures",
|
"futures",
|
||||||
"hmac",
|
"hmac",
|
||||||
"hyper",
|
"hyper",
|
||||||
|
"itertools",
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
@@ -1010,7 +1032,7 @@ dependencies = [
|
|||||||
"base64",
|
"base64",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
"fallible-iterator",
|
"fallible-iterator 0.2.0",
|
||||||
"hmac",
|
"hmac",
|
||||||
"md-5",
|
"md-5",
|
||||||
"memchr",
|
"memchr",
|
||||||
@@ -1236,9 +1258,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_spanned"
|
name = "serde_spanned"
|
||||||
version = "0.6.1"
|
version = "0.6.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0efd8caf556a6cebd3b285caf480045fcc1ac04f6bd786b09a6f11af30c4fcf4"
|
checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
@@ -1512,9 +1534,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml"
|
name = "toml"
|
||||||
version = "0.7.3"
|
version = "0.7.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b403acf6f2bb0859c93c7f0d967cb4a75a7ac552100f9322faf64dc047669b21"
|
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_spanned",
|
"serde_spanned",
|
||||||
@@ -1524,18 +1546,18 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml_datetime"
|
name = "toml_datetime"
|
||||||
version = "0.6.1"
|
version = "0.6.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622"
|
checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml_edit"
|
name = "toml_edit"
|
||||||
version = "0.19.6"
|
version = "0.19.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "08de71aa0d6e348f070457f85af8bd566e2bc452156a423ddf22861b3a953fae"
|
checksum = "92d964908cec0d030b812013af25a0e57fddfadb1e066ecc6681d86253129d4f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"serde",
|
"serde",
|
||||||
@@ -1889,9 +1911,9 @@ checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winnow"
|
name = "winnow"
|
||||||
version = "0.3.3"
|
version = "0.4.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "faf09497b8f8b5ac5d3bb4d05c0a99be20f26fd3d5f2db7b0716e946d5103658"
|
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha1"
|
version = "1.0.2-alpha3"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@@ -38,13 +38,15 @@ socket2 = { version = "0.4.7", features = ["all"] }
|
|||||||
nix = "0.26.2"
|
nix = "0.26.2"
|
||||||
atomic_enum = "0.2.0"
|
atomic_enum = "0.2.0"
|
||||||
postgres-protocol = "0.6.5"
|
postgres-protocol = "0.6.5"
|
||||||
fallible-iterator = "0.2"
|
fallible-iterator = "0.3"
|
||||||
pin-project = "1"
|
pin-project = "1"
|
||||||
webpki-roots = "0.23"
|
webpki-roots = "0.23"
|
||||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||||
trust-dns-resolver = "0.22.0"
|
trust-dns-resolver = "0.22.0"
|
||||||
tokio-test = "0.4.2"
|
tokio-test = "0.4.2"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
|
itertools = "0.10"
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
jemallocator = "0.5.0"
|
jemallocator = "0.5.0"
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ x-common-env-pg:
|
|||||||
|
|
||||||
services:
|
services:
|
||||||
main:
|
main:
|
||||||
image: kubernetes/pause
|
image: gcr.io/google_containers/pause:3.2
|
||||||
ports:
|
ports:
|
||||||
- 6432
|
- 6432
|
||||||
|
|
||||||
@@ -64,7 +64,7 @@ services:
|
|||||||
<<: *common-env-pg
|
<<: *common-env-pg
|
||||||
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
||||||
PGPORT: 10432
|
PGPORT: 10432
|
||||||
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
command: ["postgres", "-p", "10432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
||||||
|
|
||||||
toxiproxy:
|
toxiproxy:
|
||||||
build: .
|
build: .
|
||||||
|
|||||||
22
pgcat.minimal.toml
Normal file
22
pgcat.minimal.toml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# This is an example of the most basic config
|
||||||
|
# that will mimic what PgBouncer does in transaction mode with one server.
|
||||||
|
|
||||||
|
[general]
|
||||||
|
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = 6433
|
||||||
|
admin_username = "pgcat"
|
||||||
|
admin_password = "pgcat"
|
||||||
|
|
||||||
|
[pools.pgml.users.0]
|
||||||
|
username = "postgres"
|
||||||
|
password = "postgres"
|
||||||
|
pool_size = 10
|
||||||
|
min_pool_size = 1
|
||||||
|
pool_mode = "transaction"
|
||||||
|
|
||||||
|
[pools.pgml.shards.0]
|
||||||
|
servers = [
|
||||||
|
["127.0.0.1", 28815, "primary"]
|
||||||
|
]
|
||||||
|
database = "postgres"
|
||||||
100
pgcat.toml
100
pgcat.toml
@@ -77,8 +77,57 @@ admin_username = "admin_user"
|
|||||||
# Password to access the virtual administrative database
|
# Password to access the virtual administrative database
|
||||||
admin_password = "admin_pass"
|
admin_password = "admin_pass"
|
||||||
|
|
||||||
# Plugins!!
|
# Default plugins that are configured on all pools.
|
||||||
# query_router_plugins = ["pg_table_access", "intercept"]
|
[plugins]
|
||||||
|
|
||||||
|
# Prewarmer plugin that runs queries on server startup, before giving the connection
|
||||||
|
# to the client.
|
||||||
|
[plugins.prewarmer]
|
||||||
|
enabled = false
|
||||||
|
queries = [
|
||||||
|
"SELECT pg_prewarm('pgbench_accounts')",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Log all queries to stdout.
|
||||||
|
[plugins.query_logger]
|
||||||
|
enabled = false
|
||||||
|
|
||||||
|
# Block access to tables that Postgres does not allow us to control.
|
||||||
|
[plugins.table_access]
|
||||||
|
enabled = false
|
||||||
|
tables = [
|
||||||
|
"pg_user",
|
||||||
|
"pg_roles",
|
||||||
|
"pg_database",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Intercept user queries and give a fake reply.
|
||||||
|
[plugins.intercept]
|
||||||
|
enabled = true
|
||||||
|
|
||||||
|
[plugins.intercept.queries.0]
|
||||||
|
|
||||||
|
query = "select current_database() as a, current_schemas(false) as b"
|
||||||
|
schema = [
|
||||||
|
["a", "text"],
|
||||||
|
["b", "text"],
|
||||||
|
]
|
||||||
|
result = [
|
||||||
|
["${DATABASE}", "{public}"],
|
||||||
|
]
|
||||||
|
|
||||||
|
[plugins.intercept.queries.1]
|
||||||
|
|
||||||
|
query = "select current_database(), current_schema(), current_user"
|
||||||
|
schema = [
|
||||||
|
["current_database", "text"],
|
||||||
|
["current_schema", "text"],
|
||||||
|
["current_user", "text"],
|
||||||
|
]
|
||||||
|
result = [
|
||||||
|
["${DATABASE}", "public", "${USER}"],
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
# pool configs are structured as pool.<pool_name>
|
# pool configs are structured as pool.<pool_name>
|
||||||
# the pool_name is what clients use as database name when connecting.
|
# the pool_name is what clients use as database name when connecting.
|
||||||
@@ -157,6 +206,53 @@ connect_timeout = 3000
|
|||||||
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
||||||
# dns_max_ttl = 30
|
# dns_max_ttl = 30
|
||||||
|
|
||||||
|
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
|
||||||
|
# so all plugins have to be configured here again.
|
||||||
|
[pool.sharded_db.plugins]
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.prewarmer]
|
||||||
|
enabled = true
|
||||||
|
queries = [
|
||||||
|
"SELECT pg_prewarm('pgbench_accounts')",
|
||||||
|
]
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.query_logger]
|
||||||
|
enabled = false
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.table_access]
|
||||||
|
enabled = false
|
||||||
|
tables = [
|
||||||
|
"pg_user",
|
||||||
|
"pg_roles",
|
||||||
|
"pg_database",
|
||||||
|
]
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.intercept]
|
||||||
|
enabled = true
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.intercept.queries.0]
|
||||||
|
|
||||||
|
query = "select current_database() as a, current_schemas(false) as b"
|
||||||
|
schema = [
|
||||||
|
["a", "text"],
|
||||||
|
["b", "text"],
|
||||||
|
]
|
||||||
|
result = [
|
||||||
|
["${DATABASE}", "{public}"],
|
||||||
|
]
|
||||||
|
|
||||||
|
[pools.sharded_db.plugins.intercept.queries.1]
|
||||||
|
|
||||||
|
query = "select current_database(), current_schema(), current_user"
|
||||||
|
schema = [
|
||||||
|
["current_database", "text"],
|
||||||
|
["current_schema", "text"],
|
||||||
|
["current_user", "text"],
|
||||||
|
]
|
||||||
|
result = [
|
||||||
|
["${DATABASE}", "public", "${USER}"],
|
||||||
|
]
|
||||||
|
|
||||||
# User configs are structured as pool.<pool_name>.users.<user_index>
|
# User configs are structured as pool.<pool_name>.users.<user_index>
|
||||||
# This section holds the credentials for users that may connect to this cluster
|
# This section holds the credentials for users that may connect to this cluster
|
||||||
[pools.sharded_db.users.0]
|
[pools.sharded_db.users.0]
|
||||||
|
|||||||
@@ -1313,7 +1313,7 @@ where
|
|||||||
.receive_server_message(server, &address, &pool, &self.stats.clone())
|
.receive_server_message(server, &address, &pool, &self.stats.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match write_all_half(&mut self.write, &response).await {
|
match write_all_flush(&mut self.write, &response).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
@@ -1408,7 +1408,7 @@ where
|
|||||||
.receive_server_message(server, address, pool, client_stats)
|
.receive_server_message(server, address, pool, client_stats)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match write_all_half(&mut self.write, &response).await {
|
match write_all_flush(&mut self.write, &response).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
|
|||||||
157
src/config.rs
157
src/config.rs
@@ -122,6 +122,16 @@ impl Default for Address {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Address {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"[address: {}:{}][database: {}][user: {}]",
|
||||||
|
self.host, self.port, self.database, self.username
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
||||||
impl PartialEq for Address {
|
impl PartialEq for Address {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
@@ -235,6 +245,8 @@ pub struct General {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
|
|
||||||
pub enable_prometheus_exporter: Option<bool>,
|
pub enable_prometheus_exporter: Option<bool>,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_prometheus_exporter_port")]
|
||||||
pub prometheus_exporter_port: i16,
|
pub prometheus_exporter_port: i16,
|
||||||
|
|
||||||
#[serde(default = "General::default_connect_timeout")]
|
#[serde(default = "General::default_connect_timeout")]
|
||||||
@@ -298,12 +310,13 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_validate_config")]
|
||||||
|
pub validate_config: bool,
|
||||||
|
|
||||||
// Support for auth query
|
// Support for auth query
|
||||||
pub auth_query: Option<String>,
|
pub auth_query: Option<String>,
|
||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
pub query_router_plugins: Option<Vec<String>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl General {
|
impl General {
|
||||||
@@ -369,6 +382,14 @@ impl General {
|
|||||||
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_validate_config() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn default_prometheus_exporter_port() -> i16 {
|
||||||
|
9930
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for General {
|
impl Default for General {
|
||||||
@@ -404,7 +425,7 @@ impl Default for General {
|
|||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: 1000 * 3600 * 24, // 24 hours,
|
server_lifetime: 1000 * 3600 * 24, // 24 hours,
|
||||||
query_router_plugins: None,
|
validate_config: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -457,6 +478,7 @@ pub struct Pool {
|
|||||||
#[serde(default = "Pool::default_load_balancing_mode")]
|
#[serde(default = "Pool::default_load_balancing_mode")]
|
||||||
pub load_balancing_mode: LoadBalancingMode,
|
pub load_balancing_mode: LoadBalancingMode,
|
||||||
|
|
||||||
|
#[serde(default = "Pool::default_default_role")]
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
@@ -465,12 +487,18 @@ pub struct Pool {
|
|||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
pub primary_reads_enabled: bool,
|
pub primary_reads_enabled: bool,
|
||||||
|
|
||||||
|
/// Maximum time to allow for establishing a new server connection.
|
||||||
pub connect_timeout: Option<u64>,
|
pub connect_timeout: Option<u64>,
|
||||||
|
|
||||||
|
/// Close idle connections that have been opened for longer than this.
|
||||||
pub idle_timeout: Option<u64>,
|
pub idle_timeout: Option<u64>,
|
||||||
|
|
||||||
|
/// Close server connections that have been opened for longer than this.
|
||||||
|
/// Only applied to idle connections. If the connection is actively used for
|
||||||
|
/// longer than this period, the pool will not interrupt it.
|
||||||
pub server_lifetime: Option<u64>,
|
pub server_lifetime: Option<u64>,
|
||||||
|
|
||||||
|
#[serde(default = "Pool::default_sharding_function")]
|
||||||
pub sharding_function: ShardingFunction,
|
pub sharding_function: ShardingFunction,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_automatic_sharding_key")]
|
#[serde(default = "Pool::default_automatic_sharding_key")]
|
||||||
@@ -484,6 +512,10 @@ pub struct Pool {
|
|||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
|
#[serde(default = "Pool::default_cleanup_server_connections")]
|
||||||
|
pub cleanup_server_connections: bool,
|
||||||
|
|
||||||
|
pub plugins: Option<Plugins>,
|
||||||
pub shards: BTreeMap<String, Shard>,
|
pub shards: BTreeMap<String, Shard>,
|
||||||
pub users: BTreeMap<String, User>,
|
pub users: BTreeMap<String, User>,
|
||||||
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
||||||
@@ -516,6 +548,18 @@ impl Pool {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_default_role() -> String {
|
||||||
|
"any".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn default_sharding_function() -> ShardingFunction {
|
||||||
|
ShardingFunction::PgBigintHash
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn default_cleanup_server_connections() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
pub fn validate(&mut self) -> Result<(), Error> {
|
pub fn validate(&mut self) -> Result<(), Error> {
|
||||||
match self.default_role.as_ref() {
|
match self.default_role.as_ref() {
|
||||||
"any" => (),
|
"any" => (),
|
||||||
@@ -604,6 +648,8 @@ impl Default for Pool {
|
|||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: None,
|
server_lifetime: None,
|
||||||
|
plugins: None,
|
||||||
|
cleanup_server_connections: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -682,6 +728,76 @@ impl Default for Shard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct Plugins {
|
||||||
|
pub intercept: Option<Intercept>,
|
||||||
|
pub table_access: Option<TableAccess>,
|
||||||
|
pub query_logger: Option<QueryLogger>,
|
||||||
|
pub prewarmer: Option<Prewarmer>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Plugins {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
|
||||||
|
self.intercept.is_some(),
|
||||||
|
self.table_access.is_some(),
|
||||||
|
self.query_logger.is_some(),
|
||||||
|
self.prewarmer.is_some(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct Intercept {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub queries: BTreeMap<String, Query>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct TableAccess {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub tables: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct QueryLogger {
|
||||||
|
pub enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct Prewarmer {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub queries: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Intercept {
|
||||||
|
pub fn substitute(&mut self, db: &str, user: &str) {
|
||||||
|
for (_, query) in self.queries.iter_mut() {
|
||||||
|
query.substitute(db, user);
|
||||||
|
query.query = query.query.to_ascii_lowercase();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
||||||
|
pub struct Query {
|
||||||
|
pub query: String,
|
||||||
|
pub schema: Vec<Vec<String>>,
|
||||||
|
pub result: Vec<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Query {
|
||||||
|
pub fn substitute(&mut self, db: &str, user: &str) {
|
||||||
|
for col in self.result.iter_mut() {
|
||||||
|
for i in 0..col.len() {
|
||||||
|
col[i] = col[i].replace("${USER}", user).replace("${DATABASE}", db);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Configuration wrapper.
|
/// Configuration wrapper.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -699,7 +815,13 @@ pub struct Config {
|
|||||||
#[serde(default = "Config::default_path")]
|
#[serde(default = "Config::default_path")]
|
||||||
pub path: String,
|
pub path: String,
|
||||||
|
|
||||||
|
// General and global settings.
|
||||||
pub general: General,
|
pub general: General,
|
||||||
|
|
||||||
|
// Plugins that should run in all pools.
|
||||||
|
pub plugins: Option<Plugins>,
|
||||||
|
|
||||||
|
// Connection pools.
|
||||||
pub pools: HashMap<String, Pool>,
|
pub pools: HashMap<String, Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -737,6 +859,7 @@ impl Default for Config {
|
|||||||
path: Self::default_path(),
|
path: Self::default_path(),
|
||||||
general: General::default(),
|
general: General::default(),
|
||||||
pools: HashMap::default(),
|
pools: HashMap::default(),
|
||||||
|
plugins: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -883,6 +1006,13 @@ impl Config {
|
|||||||
"Server TLS certificate verification: {}",
|
"Server TLS certificate verification: {}",
|
||||||
self.general.verify_server_certificate
|
self.general.verify_server_certificate
|
||||||
);
|
);
|
||||||
|
info!(
|
||||||
|
"Plugins: {}",
|
||||||
|
match self.plugins {
|
||||||
|
Some(ref plugins) => plugins.to_string(),
|
||||||
|
None => "not configured".into(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
for (pool_name, pool_config) in &self.pools {
|
for (pool_name, pool_config) in &self.pools {
|
||||||
// TODO: Make this output prettier (maybe a table?)
|
// TODO: Make this output prettier (maybe a table?)
|
||||||
@@ -949,6 +1079,18 @@ impl Config {
|
|||||||
None => "default".to_string(),
|
None => "default".to_string(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
info!(
|
||||||
|
"[pool: {}] Cleanup server connections: {}",
|
||||||
|
pool_name, pool_config.cleanup_server_connections
|
||||||
|
);
|
||||||
|
info!(
|
||||||
|
"[pool: {}] Plugins: {}",
|
||||||
|
pool_name,
|
||||||
|
match pool_config.plugins {
|
||||||
|
Some(ref plugins) => plugins.to_string(),
|
||||||
|
None => "not configured".into(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
for user in &pool_config.users {
|
for user in &pool_config.users {
|
||||||
info!(
|
info!(
|
||||||
@@ -1128,6 +1270,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
|
|
||||||
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
||||||
let old_config = get_config();
|
let old_config = get_config();
|
||||||
|
|
||||||
match parse(&old_config.path).await {
|
match parse(&old_config.path).await {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -1135,18 +1278,18 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
|
|||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_config = get_config();
|
let new_config = get_config();
|
||||||
|
|
||||||
match CachedResolver::from_config().await {
|
match CachedResolver::from_config().await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => error!("DNS cache reinitialization error: {:?}", err),
|
Err(err) => error!("DNS cache reinitialization error: {:?}", err),
|
||||||
};
|
};
|
||||||
|
|
||||||
if old_config.pools != new_config.pools {
|
if old_config != new_config {
|
||||||
info!("Pool configuration changed");
|
info!("Config changed, reloading");
|
||||||
ConnectionPool::from_config(client_server_map).await?;
|
ConnectionPool::from_config(client_server_map).await?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else if old_config != new_config {
|
|
||||||
Ok(true)
|
|
||||||
} else {
|
} else {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,6 +43,8 @@ impl MirroredClient {
|
|||||||
ClientServerMap::default(),
|
ClientServerMap::default(),
|
||||||
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
||||||
Arc::new(RwLock::new(None)),
|
Arc::new(RwLock::new(None)),
|
||||||
|
None,
|
||||||
|
true,
|
||||||
);
|
);
|
||||||
|
|
||||||
Pool::builder()
|
Pool::builder()
|
||||||
|
|||||||
@@ -2,41 +2,21 @@
|
|||||||
//!
|
//!
|
||||||
//! It intercepts queries and returns fake results.
|
//! It intercepts queries and returns fake results.
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::{json, Value};
|
|
||||||
use sqlparser::ast::Statement;
|
use sqlparser::ast::Statement;
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
config::Intercept as InterceptConfig,
|
||||||
errors::Error,
|
errors::Error,
|
||||||
messages::{command_complete, data_row_nullable, row_description, DataType},
|
messages::{command_complete, data_row_nullable, row_description, DataType},
|
||||||
plugins::{Plugin, PluginOutput},
|
plugins::{Plugin, PluginOutput},
|
||||||
pool::{PoolIdentifier, PoolMap},
|
|
||||||
query_router::QueryRouter,
|
query_router::QueryRouter,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub static CONFIG: Lazy<ArcSwap<HashMap<PoolIdentifier, Value>>> =
|
|
||||||
Lazy::new(|| ArcSwap::from_pointee(HashMap::new()));
|
|
||||||
|
|
||||||
/// Configure the intercept plugin.
|
|
||||||
pub fn configure(pools: &PoolMap) {
|
|
||||||
let mut config = HashMap::new();
|
|
||||||
for (identifier, _) in pools.iter() {
|
|
||||||
// TODO: make this configurable from a text config.
|
|
||||||
let value = fool_datagrip(&identifier.db, &identifier.user);
|
|
||||||
config.insert(identifier.clone(), value);
|
|
||||||
}
|
|
||||||
|
|
||||||
CONFIG.store(Arc::new(config));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: use these structs for deserialization
|
// TODO: use these structs for deserialization
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct Rule {
|
pub struct Rule {
|
||||||
@@ -52,45 +32,47 @@ pub struct Column {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The intercept plugin.
|
/// The intercept plugin.
|
||||||
pub struct Intercept;
|
pub struct Intercept<'a> {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub config: &'a InterceptConfig,
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Plugin for Intercept {
|
impl<'a> Plugin for Intercept<'a> {
|
||||||
async fn run(
|
async fn run(
|
||||||
&mut self,
|
&mut self,
|
||||||
query_router: &QueryRouter,
|
query_router: &QueryRouter,
|
||||||
ast: &Vec<Statement>,
|
ast: &Vec<Statement>,
|
||||||
) -> Result<PluginOutput, Error> {
|
) -> Result<PluginOutput, Error> {
|
||||||
if ast.is_empty() {
|
if !self.enabled || ast.is_empty() {
|
||||||
return Ok(PluginOutput::Allow);
|
return Ok(PluginOutput::Allow);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut result = BytesMut::new();
|
let mut config = self.config.clone();
|
||||||
let query_map = match CONFIG.load().get(&PoolIdentifier::new(
|
config.substitute(
|
||||||
&query_router.pool_settings().db,
|
&query_router.pool_settings().db,
|
||||||
&query_router.pool_settings().user.username,
|
&query_router.pool_settings().user.username,
|
||||||
)) {
|
);
|
||||||
Some(query_map) => query_map.clone(),
|
|
||||||
None => return Ok(PluginOutput::Allow),
|
let mut result = BytesMut::new();
|
||||||
};
|
|
||||||
|
|
||||||
for q in ast {
|
for q in ast {
|
||||||
// Normalization
|
// Normalization
|
||||||
let q = q.to_string().to_ascii_lowercase();
|
let q = q.to_string().to_ascii_lowercase();
|
||||||
|
|
||||||
for target in query_map.as_array().unwrap().iter() {
|
for (_, target) in config.queries.iter() {
|
||||||
if target["query"].as_str().unwrap() == q {
|
if target.query.as_str() == q {
|
||||||
debug!("Query matched: {}", q);
|
debug!("Intercepting query: {}", q);
|
||||||
|
|
||||||
let rd = target["schema"]
|
let rd = target
|
||||||
.as_array()
|
.schema
|
||||||
.unwrap()
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|row| {
|
.map(|row| {
|
||||||
let row = row.as_object().unwrap();
|
let name = &row[0];
|
||||||
|
let data_type = &row[1];
|
||||||
(
|
(
|
||||||
row["name"].as_str().unwrap(),
|
name.as_str(),
|
||||||
match row["data_type"].as_str().unwrap() {
|
match data_type.as_str() {
|
||||||
"text" => DataType::Text,
|
"text" => DataType::Text,
|
||||||
"anyarray" => DataType::AnyArray,
|
"anyarray" => DataType::AnyArray,
|
||||||
"oid" => DataType::Oid,
|
"oid" => DataType::Oid,
|
||||||
@@ -104,13 +86,11 @@ impl Plugin for Intercept {
|
|||||||
|
|
||||||
result.put(row_description(&rd));
|
result.put(row_description(&rd));
|
||||||
|
|
||||||
target["result"].as_array().unwrap().iter().for_each(|row| {
|
target.result.iter().for_each(|row| {
|
||||||
let row = row
|
let row = row
|
||||||
.as_array()
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
let s = s.as_str().unwrap().to_string();
|
let s = s.as_str().to_string();
|
||||||
|
|
||||||
if s == "" {
|
if s == "" {
|
||||||
None
|
None
|
||||||
@@ -138,141 +118,3 @@ impl Plugin for Intercept {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Make IntelliJ SQL plugin believe it's talking to an actual database
|
|
||||||
/// instead of PgCat.
|
|
||||||
fn fool_datagrip(database: &str, user: &str) -> Value {
|
|
||||||
json!([
|
|
||||||
{
|
|
||||||
"query": "select current_database() as a, current_schemas(false) as b",
|
|
||||||
"schema": [
|
|
||||||
{
|
|
||||||
"name": "a",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "b",
|
|
||||||
"data_type": "anyarray",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
|
|
||||||
"result": [
|
|
||||||
[database, "{public}"],
|
|
||||||
],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"query": "select current_database(), current_schema(), current_user",
|
|
||||||
"schema": [
|
|
||||||
{
|
|
||||||
"name": "current_database",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "current_schema",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "current_user",
|
|
||||||
"data_type": "text",
|
|
||||||
}
|
|
||||||
],
|
|
||||||
|
|
||||||
"result": [
|
|
||||||
["sharded_db", "public", "sharding_user"],
|
|
||||||
],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"query": "select cast(n.oid as bigint) as id, datname as name, d.description, datistemplate as is_template, datallowconn as allow_connections, pg_catalog.pg_get_userbyid(n.datdba) as \"owner\" from pg_catalog.pg_database as n left join pg_catalog.pg_shdescription as d on n.oid = d.objoid order by case when datname = pg_catalog.current_database() then -cast(1 as bigint) else cast(n.oid as bigint) end",
|
|
||||||
"schema": [
|
|
||||||
{
|
|
||||||
"name": "id",
|
|
||||||
"data_type": "oid",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "name",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "description",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "is_template",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "allow_connections",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "owner",
|
|
||||||
"data_type": "text",
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"result": [
|
|
||||||
["16387", database, "", "f", "t", user],
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"query": "select cast(r.oid as bigint) as role_id, rolname as role_name, rolsuper as is_super, rolinherit as is_inherit, rolcreaterole as can_createrole, rolcreatedb as can_createdb, rolcanlogin as can_login, rolreplication as is_replication, rolconnlimit as conn_limit, rolvaliduntil as valid_until, rolbypassrls as bypass_rls, rolconfig as config, d.description from pg_catalog.pg_roles as r left join pg_catalog.pg_shdescription as d on d.objoid = r.oid",
|
|
||||||
"schema": [
|
|
||||||
{
|
|
||||||
"name": "role_id",
|
|
||||||
"data_type": "oid",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "role_name",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "is_super",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "is_inherit",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "can_createrole",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "can_createdb",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "can_login",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "is_replication",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "conn_limit",
|
|
||||||
"data_type": "int4",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "valid_until",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "bypass_rls",
|
|
||||||
"data_type": "bool",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "config",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "description",
|
|
||||||
"data_type": "text",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
"result": [
|
|
||||||
["10", "postgres", "f", "t", "f", "f", "t", "f", "-1", "", "f", "", ""],
|
|
||||||
["16419", user, "f", "t", "f", "f", "t", "f", "-1", "", "f", "", ""],
|
|
||||||
]
|
|
||||||
}
|
|
||||||
])
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -9,6 +9,8 @@
|
|||||||
//!
|
//!
|
||||||
|
|
||||||
pub mod intercept;
|
pub mod intercept;
|
||||||
|
pub mod prewarmer;
|
||||||
|
pub mod query_logger;
|
||||||
pub mod table_access;
|
pub mod table_access;
|
||||||
|
|
||||||
use crate::{errors::Error, query_router::QueryRouter};
|
use crate::{errors::Error, query_router::QueryRouter};
|
||||||
@@ -17,6 +19,7 @@ use bytes::BytesMut;
|
|||||||
use sqlparser::ast::Statement;
|
use sqlparser::ast::Statement;
|
||||||
|
|
||||||
pub use intercept::Intercept;
|
pub use intercept::Intercept;
|
||||||
|
pub use query_logger::QueryLogger;
|
||||||
pub use table_access::TableAccess;
|
pub use table_access::TableAccess;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
@@ -29,12 +32,13 @@ pub enum PluginOutput {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Plugin {
|
pub trait Plugin {
|
||||||
// Custom output is allowed because we want to extend this system
|
// Run before the query is sent to the server.
|
||||||
// to rewriting queries some day. So an output of a plugin could be
|
|
||||||
// a rewritten AST.
|
|
||||||
async fn run(
|
async fn run(
|
||||||
&mut self,
|
&mut self,
|
||||||
query_router: &QueryRouter,
|
query_router: &QueryRouter,
|
||||||
ast: &Vec<Statement>,
|
ast: &Vec<Statement>,
|
||||||
) -> Result<PluginOutput, Error>;
|
) -> Result<PluginOutput, Error>;
|
||||||
|
|
||||||
|
// TODO: run after the result is returned
|
||||||
|
// async fn callback(&mut self, query_router: &QueryRouter);
|
||||||
}
|
}
|
||||||
|
|||||||
28
src/plugins/prewarmer.rs
Normal file
28
src/plugins/prewarmer.rs
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
//! Prewarm new connections before giving them to the client.
|
||||||
|
use crate::{errors::Error, server::Server};
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
pub struct Prewarmer<'a> {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub server: &'a mut Server,
|
||||||
|
pub queries: &'a Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Prewarmer<'a> {
|
||||||
|
pub async fn run(&mut self) -> Result<(), Error> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
for query in self.queries {
|
||||||
|
info!(
|
||||||
|
"{} Prewarning with query: `{}`",
|
||||||
|
self.server.address(),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
self.server.query(&query).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
38
src/plugins/query_logger.rs
Normal file
38
src/plugins/query_logger.rs
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
//! Log all queries to stdout (or somewhere else, why not).
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
errors::Error,
|
||||||
|
plugins::{Plugin, PluginOutput},
|
||||||
|
query_router::QueryRouter,
|
||||||
|
};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use log::info;
|
||||||
|
use sqlparser::ast::Statement;
|
||||||
|
|
||||||
|
pub struct QueryLogger<'a> {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub user: &'a str,
|
||||||
|
pub db: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<'a> Plugin for QueryLogger<'a> {
|
||||||
|
async fn run(
|
||||||
|
&mut self,
|
||||||
|
_query_router: &QueryRouter,
|
||||||
|
ast: &Vec<Statement>,
|
||||||
|
) -> Result<PluginOutput, Error> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(PluginOutput::Allow);
|
||||||
|
}
|
||||||
|
|
||||||
|
let query = ast
|
||||||
|
.iter()
|
||||||
|
.map(|q| q.to_string())
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join("; ");
|
||||||
|
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
|
||||||
|
|
||||||
|
Ok(PluginOutput::Allow)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,19 +10,26 @@ use crate::{
|
|||||||
query_router::QueryRouter,
|
query_router::QueryRouter,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
use core::ops::ControlFlow;
|
use core::ops::ControlFlow;
|
||||||
|
|
||||||
pub struct TableAccess {
|
pub struct TableAccess<'a> {
|
||||||
pub forbidden_tables: Vec<String>,
|
pub enabled: bool,
|
||||||
|
pub tables: &'a Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Plugin for TableAccess {
|
impl<'a> Plugin for TableAccess<'a> {
|
||||||
async fn run(
|
async fn run(
|
||||||
&mut self,
|
&mut self,
|
||||||
_query_router: &QueryRouter,
|
_query_router: &QueryRouter,
|
||||||
ast: &Vec<Statement>,
|
ast: &Vec<Statement>,
|
||||||
) -> Result<PluginOutput, Error> {
|
) -> Result<PluginOutput, Error> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(PluginOutput::Allow);
|
||||||
|
}
|
||||||
|
|
||||||
let mut found = None;
|
let mut found = None;
|
||||||
|
|
||||||
visit_relations(ast, |relation| {
|
visit_relations(ast, |relation| {
|
||||||
@@ -30,7 +37,7 @@ impl Plugin for TableAccess {
|
|||||||
let parts = relation.split(".").collect::<Vec<&str>>();
|
let parts = relation.split(".").collect::<Vec<&str>>();
|
||||||
let table_name = parts.last().unwrap();
|
let table_name = parts.last().unwrap();
|
||||||
|
|
||||||
if self.forbidden_tables.contains(&table_name.to_string()) {
|
if self.tables.contains(&table_name.to_string()) {
|
||||||
found = Some(table_name.to_string());
|
found = Some(table_name.to_string());
|
||||||
ControlFlow::<()>::Break(())
|
ControlFlow::<()>::Break(())
|
||||||
} else {
|
} else {
|
||||||
@@ -39,6 +46,8 @@ impl Plugin for TableAccess {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if let Some(found) = found {
|
if let Some(found) = found {
|
||||||
|
debug!("Blocking access to table \"{}\"", found);
|
||||||
|
|
||||||
Ok(PluginOutput::Deny(format!(
|
Ok(PluginOutput::Deny(format!(
|
||||||
"permission for table \"{}\" denied",
|
"permission for table \"{}\" denied",
|
||||||
found
|
found
|
||||||
|
|||||||
101
src/pool.rs
101
src/pool.rs
@@ -17,10 +17,13 @@ use std::sync::{
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
|
use crate::config::{
|
||||||
|
get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
|
||||||
|
};
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
|
|
||||||
use crate::auth_passthrough::AuthPassthrough;
|
use crate::auth_passthrough::AuthPassthrough;
|
||||||
|
use crate::plugins::prewarmer;
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
||||||
@@ -133,7 +136,8 @@ pub struct PoolSettings {
|
|||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
pub plugins: Option<Vec<String>>,
|
/// Plugins
|
||||||
|
pub plugins: Option<Plugins>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PoolSettings {
|
impl Default for PoolSettings {
|
||||||
@@ -198,6 +202,7 @@ pub struct ConnectionPool {
|
|||||||
paused: Arc<AtomicBool>,
|
paused: Arc<AtomicBool>,
|
||||||
paused_waiter: Arc<Notify>,
|
paused_waiter: Arc<Notify>,
|
||||||
|
|
||||||
|
/// Statistics.
|
||||||
pub stats: Arc<PoolStats>,
|
pub stats: Arc<PoolStats>,
|
||||||
|
|
||||||
/// AuthInfo
|
/// AuthInfo
|
||||||
@@ -355,6 +360,11 @@ impl ConnectionPool {
|
|||||||
client_server_map.clone(),
|
client_server_map.clone(),
|
||||||
pool_stats.clone(),
|
pool_stats.clone(),
|
||||||
pool_auth_hash.clone(),
|
pool_auth_hash.clone(),
|
||||||
|
match pool_config.plugins {
|
||||||
|
Some(ref plugins) => Some(plugins.clone()),
|
||||||
|
None => config.plugins.clone(),
|
||||||
|
},
|
||||||
|
pool_config.cleanup_server_connections,
|
||||||
);
|
);
|
||||||
|
|
||||||
let connect_timeout = match pool_config.connect_timeout {
|
let connect_timeout = match pool_config.connect_timeout {
|
||||||
@@ -380,7 +390,10 @@ impl ConnectionPool {
|
|||||||
.min()
|
.min()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
debug!("Pool reaper rate: {}ms", reaper_rate);
|
debug!(
|
||||||
|
"[pool: {}][user: {}] Pool reaper rate: {}ms",
|
||||||
|
pool_name, user.username, reaper_rate
|
||||||
|
);
|
||||||
|
|
||||||
let pool = Pool::builder()
|
let pool = Pool::builder()
|
||||||
.max_size(user.pool_size)
|
.max_size(user.pool_size)
|
||||||
@@ -389,9 +402,13 @@ impl ConnectionPool {
|
|||||||
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
||||||
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
|
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
|
||||||
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
|
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
|
||||||
.test_on_check_out(false)
|
.test_on_check_out(false);
|
||||||
.build(manager)
|
|
||||||
.await?;
|
let pool = if config.general.validate_config {
|
||||||
|
pool.build(manager).await?
|
||||||
|
} else {
|
||||||
|
pool.build_unchecked(manager)
|
||||||
|
};
|
||||||
|
|
||||||
pools.push(pool);
|
pools.push(pool);
|
||||||
servers.push(address);
|
servers.push(address);
|
||||||
@@ -453,7 +470,10 @@ impl ConnectionPool {
|
|||||||
auth_query: pool_config.auth_query.clone(),
|
auth_query: pool_config.auth_query.clone(),
|
||||||
auth_query_user: pool_config.auth_query_user.clone(),
|
auth_query_user: pool_config.auth_query_user.clone(),
|
||||||
auth_query_password: pool_config.auth_query_password.clone(),
|
auth_query_password: pool_config.auth_query_password.clone(),
|
||||||
plugins: config.general.query_router_plugins.clone(),
|
plugins: match pool_config.plugins {
|
||||||
|
Some(ref plugins) => Some(plugins.clone()),
|
||||||
|
None => config.plugins.clone(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
validated: Arc::new(AtomicBool::new(false)),
|
validated: Arc::new(AtomicBool::new(false)),
|
||||||
paused: Arc::new(AtomicBool::new(false)),
|
paused: Arc::new(AtomicBool::new(false)),
|
||||||
@@ -463,23 +483,18 @@ impl ConnectionPool {
|
|||||||
// Connect to the servers to make sure pool configuration is valid
|
// Connect to the servers to make sure pool configuration is valid
|
||||||
// before setting it globally.
|
// before setting it globally.
|
||||||
// Do this async and somewhere else, we don't have to wait here.
|
// Do this async and somewhere else, we don't have to wait here.
|
||||||
let mut validate_pool = pool.clone();
|
if config.general.validate_config {
|
||||||
tokio::task::spawn(async move {
|
let mut validate_pool = pool.clone();
|
||||||
let _ = validate_pool.validate().await;
|
tokio::task::spawn(async move {
|
||||||
});
|
let _ = validate_pool.validate().await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// There is one pool per database/user pair.
|
// There is one pool per database/user pair.
|
||||||
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize plugins here if required.
|
|
||||||
if let Some(plugins) = config.general.query_router_plugins {
|
|
||||||
if plugins.contains(&String::from("intercept")) {
|
|
||||||
crate::plugins::intercept::configure(&new_pools);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
POOLS.store(Arc::new(new_pools.clone()));
|
POOLS.store(Arc::new(new_pools.clone()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -624,7 +639,10 @@ impl ConnectionPool {
|
|||||||
{
|
{
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Banning instance {:?}, error: {:?}", address, err);
|
error!(
|
||||||
|
"Connection checkout error for instance {:?}, error: {:?}",
|
||||||
|
address, err
|
||||||
|
);
|
||||||
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
client_stats.idle();
|
client_stats.idle();
|
||||||
@@ -700,7 +718,7 @@ impl ConnectionPool {
|
|||||||
// Health check failed.
|
// Health check failed.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Banning instance {:?} because of failed health check, {:?}",
|
"Failed health check on instance {:?}, error: {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -709,7 +727,7 @@ impl ConnectionPool {
|
|||||||
// Health check timed out.
|
// Health check timed out.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Banning instance {:?} because of health check timeout, {:?}",
|
"Health check timeout on instance {:?}, error: {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -731,13 +749,16 @@ impl ConnectionPool {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
error!("Banning instance {:?}, reason: {:?}", address, reason);
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let mut guard = self.banlist.write();
|
let mut guard = self.banlist.write();
|
||||||
error!("Banning {:?}", address);
|
|
||||||
if let Some(client_info) = client_info {
|
if let Some(client_info) = client_info {
|
||||||
client_info.ban_error();
|
client_info.ban_error();
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
}
|
}
|
||||||
|
|
||||||
guard[address.shard].insert(address.clone(), (reason, now));
|
guard[address.shard].insert(address.clone(), (reason, now));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -894,12 +915,29 @@ impl ConnectionPool {
|
|||||||
|
|
||||||
/// Wrapper for the bb8 connection pool.
|
/// Wrapper for the bb8 connection pool.
|
||||||
pub struct ServerPool {
|
pub struct ServerPool {
|
||||||
|
/// Server address.
|
||||||
address: Address,
|
address: Address,
|
||||||
|
|
||||||
|
/// Server Postgres user.
|
||||||
user: User,
|
user: User,
|
||||||
|
|
||||||
|
/// Server database.
|
||||||
database: String,
|
database: String,
|
||||||
|
|
||||||
|
/// Client/server mapping.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
|
/// Server statistics.
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
|
|
||||||
|
/// Server auth hash (for auth passthrough).
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
|
|
||||||
|
/// Server plugins.
|
||||||
|
plugins: Option<Plugins>,
|
||||||
|
|
||||||
|
/// Should we clean up dirty connections before putting them into the pool?
|
||||||
|
cleanup_connections: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerPool {
|
impl ServerPool {
|
||||||
@@ -910,6 +948,8 @@ impl ServerPool {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
|
plugins: Option<Plugins>,
|
||||||
|
cleanup_connections: bool,
|
||||||
) -> ServerPool {
|
) -> ServerPool {
|
||||||
ServerPool {
|
ServerPool {
|
||||||
address,
|
address,
|
||||||
@@ -918,6 +958,8 @@ impl ServerPool {
|
|||||||
client_server_map,
|
client_server_map,
|
||||||
stats,
|
stats,
|
||||||
auth_hash,
|
auth_hash,
|
||||||
|
plugins,
|
||||||
|
cleanup_connections,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -947,10 +989,23 @@ impl ManageConnection for ServerPool {
|
|||||||
self.client_server_map.clone(),
|
self.client_server_map.clone(),
|
||||||
stats.clone(),
|
stats.clone(),
|
||||||
self.auth_hash.clone(),
|
self.auth_hash.clone(),
|
||||||
|
self.cleanup_connections,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(conn) => {
|
Ok(mut conn) => {
|
||||||
|
if let Some(ref plugins) = self.plugins {
|
||||||
|
if let Some(ref prewarmer) = plugins.prewarmer {
|
||||||
|
let mut prewarmer = prewarmer::Prewarmer {
|
||||||
|
enabled: prewarmer.enabled,
|
||||||
|
server: &mut conn,
|
||||||
|
queries: &prewarmer.queries,
|
||||||
|
};
|
||||||
|
|
||||||
|
prewarmer.run().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stats.idle();
|
stats.idle();
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use sqlparser::parser::Parser;
|
|||||||
use crate::config::Role;
|
use crate::config::Role;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::messages::BytesMutReader;
|
use crate::messages::BytesMutReader;
|
||||||
use crate::plugins::{Intercept, Plugin, PluginOutput, TableAccess};
|
use crate::plugins::{Intercept, Plugin, PluginOutput, QueryLogger, TableAccess};
|
||||||
use crate::pool::PoolSettings;
|
use crate::pool::PoolSettings;
|
||||||
use crate::sharding::Sharder;
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
@@ -790,24 +790,44 @@ impl QueryRouter {
|
|||||||
|
|
||||||
/// Add your plugins here and execute them.
|
/// Add your plugins here and execute them.
|
||||||
pub async fn execute_plugins(&self, ast: &Vec<Statement>) -> Result<PluginOutput, Error> {
|
pub async fn execute_plugins(&self, ast: &Vec<Statement>) -> Result<PluginOutput, Error> {
|
||||||
if let Some(plugins) = &self.pool_settings.plugins {
|
let plugins = match self.pool_settings.plugins {
|
||||||
if plugins.contains(&String::from("intercept")) {
|
Some(ref plugins) => plugins,
|
||||||
let mut intercept = Intercept {};
|
None => return Ok(PluginOutput::Allow),
|
||||||
let result = intercept.run(&self, ast).await;
|
};
|
||||||
|
|
||||||
if let Ok(PluginOutput::Intercept(output)) = result {
|
if let Some(ref query_logger) = plugins.query_logger {
|
||||||
return Ok(PluginOutput::Intercept(output));
|
let mut query_logger = QueryLogger {
|
||||||
}
|
enabled: query_logger.enabled,
|
||||||
|
user: &self.pool_settings.user.username,
|
||||||
|
db: &self.pool_settings.db,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = query_logger.run(&self, ast).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ref intercept) = plugins.intercept {
|
||||||
|
let mut intercept = Intercept {
|
||||||
|
enabled: intercept.enabled,
|
||||||
|
config: &intercept,
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = intercept.run(&self, ast).await;
|
||||||
|
|
||||||
|
if let Ok(PluginOutput::Intercept(output)) = result {
|
||||||
|
return Ok(PluginOutput::Intercept(output));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if plugins.contains(&String::from("pg_table_access")) {
|
if let Some(ref table_access) = plugins.table_access {
|
||||||
let mut table_access = TableAccess {
|
let mut table_access = TableAccess {
|
||||||
forbidden_tables: vec![String::from("pg_database"), String::from("pg_roles")],
|
enabled: table_access.enabled,
|
||||||
};
|
tables: &table_access.tables,
|
||||||
|
};
|
||||||
|
|
||||||
if let Ok(PluginOutput::Deny(error)) = table_access.run(&self, ast).await {
|
let result = table_access.run(&self, ast).await;
|
||||||
return Ok(PluginOutput::Deny(error));
|
|
||||||
}
|
if let Ok(PluginOutput::Deny(error)) = result {
|
||||||
|
return Ok(PluginOutput::Deny(error));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1233,6 +1253,7 @@ mod test {
|
|||||||
db: "test".to_string(),
|
db: "test".to_string(),
|
||||||
plugins: None,
|
plugins: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
qr.update_pool_settings(pool_settings.clone());
|
qr.update_pool_settings(pool_settings.clone());
|
||||||
|
|
||||||
@@ -1376,12 +1397,24 @@ mod test {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_table_access_plugin() {
|
async fn test_table_access_plugin() {
|
||||||
|
use crate::config::{Plugins, TableAccess};
|
||||||
|
let table_access = TableAccess {
|
||||||
|
enabled: true,
|
||||||
|
tables: vec![String::from("pg_database")],
|
||||||
|
};
|
||||||
|
let plugins = Plugins {
|
||||||
|
table_access: Some(table_access),
|
||||||
|
intercept: None,
|
||||||
|
query_logger: None,
|
||||||
|
prewarmer: None,
|
||||||
|
};
|
||||||
|
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
let mut pool_settings = PoolSettings::default();
|
||||||
|
pool_settings.query_parser_enabled = true;
|
||||||
|
pool_settings.plugins = Some(plugins);
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
|
|
||||||
let mut pool_settings = PoolSettings::default();
|
|
||||||
pool_settings.plugins = Some(vec![String::from("pg_table_access")]);
|
|
||||||
qr.update_pool_settings(pool_settings);
|
qr.update_pool_settings(pool_settings);
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM pg_database");
|
let query = simple_query("SELECT * FROM pg_database");
|
||||||
@@ -1396,4 +1429,17 @@ mod test {
|
|||||||
))
|
))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_plugins_disabled_by_defaault() {
|
||||||
|
QueryRouter::setup();
|
||||||
|
let qr = QueryRouter::new();
|
||||||
|
|
||||||
|
let query = simple_query("SELECT * FROM pg_database");
|
||||||
|
let ast = QueryRouter::parse(&query).unwrap();
|
||||||
|
|
||||||
|
let res = qr.execute_plugins(&ast).await;
|
||||||
|
|
||||||
|
assert_eq!(res, Ok(PluginOutput::Allow));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -103,6 +103,48 @@ impl StreamInner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone)]
|
||||||
|
struct CleanupState {
|
||||||
|
/// If server connection requires DISCARD ALL before checkin because of set statement
|
||||||
|
needs_cleanup_set: bool,
|
||||||
|
|
||||||
|
/// If server connection requires DISCARD ALL before checkin because of prepare statement
|
||||||
|
needs_cleanup_prepare: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CleanupState {
|
||||||
|
fn new() -> Self {
|
||||||
|
CleanupState {
|
||||||
|
needs_cleanup_set: false,
|
||||||
|
needs_cleanup_prepare: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn needs_cleanup(&self) -> bool {
|
||||||
|
self.needs_cleanup_set || self.needs_cleanup_prepare
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_true(&mut self) {
|
||||||
|
self.needs_cleanup_set = true;
|
||||||
|
self.needs_cleanup_prepare = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.needs_cleanup_set = false;
|
||||||
|
self.needs_cleanup_prepare = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for CleanupState {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"SET: {}, PREPARE: {}",
|
||||||
|
self.needs_cleanup_set, self.needs_cleanup_prepare
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Server state.
|
/// Server state.
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
/// Server host, e.g. localhost,
|
/// Server host, e.g. localhost,
|
||||||
@@ -131,8 +173,8 @@ pub struct Server {
|
|||||||
/// Is the server broken? We'll remote it from the pool if so.
|
/// Is the server broken? We'll remote it from the pool if so.
|
||||||
bad: bool,
|
bad: bool,
|
||||||
|
|
||||||
/// If server connection requires a DISCARD ALL before checkin
|
/// If server connection requires DISCARD ALL before checkin
|
||||||
needs_cleanup: bool,
|
cleanup_state: CleanupState,
|
||||||
|
|
||||||
/// Mapping of clients and servers used for query cancellation.
|
/// Mapping of clients and servers used for query cancellation.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
@@ -146,13 +188,16 @@ pub struct Server {
|
|||||||
/// Application name using the server at the moment.
|
/// Application name using the server at the moment.
|
||||||
application_name: String,
|
application_name: String,
|
||||||
|
|
||||||
// Last time that a successful server send or response happened
|
/// Last time that a successful server send or response happened
|
||||||
last_activity: SystemTime,
|
last_activity: SystemTime,
|
||||||
|
|
||||||
mirror_manager: Option<MirroringManager>,
|
mirror_manager: Option<MirroringManager>,
|
||||||
|
|
||||||
// Associated addresses used
|
/// Associated addresses used
|
||||||
addr_set: Option<AddrSet>,
|
addr_set: Option<AddrSet>,
|
||||||
|
|
||||||
|
/// Should clean up dirty connections?
|
||||||
|
cleanup_connections: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
@@ -165,6 +210,7 @@ impl Server {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<ServerStats>,
|
stats: Arc<ServerStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
|
cleanup_connections: bool,
|
||||||
) -> Result<Server, Error> {
|
) -> Result<Server, Error> {
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
let cached_resolver = CACHED_RESOLVER.load();
|
||||||
let mut addr_set: Option<AddrSet> = None;
|
let mut addr_set: Option<AddrSet> = None;
|
||||||
@@ -630,7 +676,7 @@ impl Server {
|
|||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
needs_cleanup: false,
|
cleanup_state: CleanupState::new(),
|
||||||
client_server_map,
|
client_server_map,
|
||||||
addr_set,
|
addr_set,
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
@@ -645,6 +691,7 @@ impl Server {
|
|||||||
address.mirrors.clone(),
|
address.mirrors.clone(),
|
||||||
)),
|
)),
|
||||||
},
|
},
|
||||||
|
cleanup_connections,
|
||||||
};
|
};
|
||||||
|
|
||||||
server.set_name("pgcat").await?;
|
server.set_name("pgcat").await?;
|
||||||
@@ -705,7 +752,10 @@ impl Server {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Terminating server because of: {:?}", err);
|
error!(
|
||||||
|
"Terminating server {:?} because of: {:?}",
|
||||||
|
self.address, err
|
||||||
|
);
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
@@ -720,7 +770,10 @@ impl Server {
|
|||||||
let mut message = match read_message(&mut self.stream).await {
|
let mut message = match read_message(&mut self.stream).await {
|
||||||
Ok(message) => message,
|
Ok(message) => message,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Terminating server because of: {:?}", err);
|
error!(
|
||||||
|
"Terminating server {:?} because of: {:?}",
|
||||||
|
self.address, err
|
||||||
|
);
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
@@ -787,12 +840,12 @@ impl Server {
|
|||||||
// This will reduce amount of discard statements sent
|
// This will reduce amount of discard statements sent
|
||||||
if !self.in_transaction {
|
if !self.in_transaction {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.needs_cleanup = true;
|
self.cleanup_state.needs_cleanup_set = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"PREPARE\0" => {
|
"PREPARE\0" => {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.needs_cleanup = true;
|
self.cleanup_state.needs_cleanup_prepare = true;
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -922,6 +975,8 @@ impl Server {
|
|||||||
/// It will use the simple query protocol.
|
/// It will use the simple query protocol.
|
||||||
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
||||||
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
||||||
|
debug!("Running `{}` on server {:?}", query, self.address);
|
||||||
|
|
||||||
let query = simple_query(query);
|
let query = simple_query(query);
|
||||||
|
|
||||||
self.send(&query).await?;
|
self.send(&query).await?;
|
||||||
@@ -954,10 +1009,11 @@ impl Server {
|
|||||||
// to avoid leaking state between clients. For performance reasons we only
|
// to avoid leaking state between clients. For performance reasons we only
|
||||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||||
// it before each checkin.
|
// it before each checkin.
|
||||||
if self.needs_cleanup {
|
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
|
||||||
warn!("Server returned with session state altered, discarding state");
|
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
|
||||||
self.query("DISCARD ALL").await?;
|
self.query("DISCARD ALL").await?;
|
||||||
self.needs_cleanup = false;
|
self.query("RESET ROLE").await?;
|
||||||
|
self.cleanup_state.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -969,12 +1025,12 @@ impl Server {
|
|||||||
self.application_name = name.to_string();
|
self.application_name = name.to_string();
|
||||||
// We don't want `SET application_name` to mark the server connection
|
// We don't want `SET application_name` to mark the server connection
|
||||||
// as needing cleanup
|
// as needing cleanup
|
||||||
let needs_cleanup_before = self.needs_cleanup;
|
let needs_cleanup_before = self.cleanup_state;
|
||||||
|
|
||||||
let result = Ok(self
|
let result = Ok(self
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
.await?);
|
.await?);
|
||||||
self.needs_cleanup = needs_cleanup_before;
|
self.cleanup_state = needs_cleanup_before;
|
||||||
result
|
result
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -999,7 +1055,7 @@ impl Server {
|
|||||||
|
|
||||||
// Marks a connection as needing DISCARD ALL at checkin
|
// Marks a connection as needing DISCARD ALL at checkin
|
||||||
pub fn mark_dirty(&mut self) {
|
pub fn mark_dirty(&mut self) {
|
||||||
self.needs_cleanup = true;
|
self.cleanup_state.set_true();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
||||||
@@ -1033,6 +1089,7 @@ impl Server {
|
|||||||
client_server_map,
|
client_server_map,
|
||||||
Arc::new(ServerStats::default()),
|
Arc::new(ServerStats::default()),
|
||||||
Arc::new(RwLock::new(None)),
|
Arc::new(RwLock::new(None)),
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
debug!("Connected!, sending query.");
|
debug!("Connected!, sending query.");
|
||||||
@@ -1135,14 +1192,18 @@ impl Drop for Server {
|
|||||||
_ => debug!("Dirty shutdown"),
|
_ => debug!("Dirty shutdown"),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Should not matter.
|
|
||||||
self.bad = true;
|
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let duration = now - self.connected_at;
|
let duration = now - self.connected_at;
|
||||||
|
|
||||||
|
let message = if self.bad {
|
||||||
|
"Server connection terminated"
|
||||||
|
} else {
|
||||||
|
"Server connection closed"
|
||||||
|
};
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Server connection closed {:?}, session duration: {}",
|
"{} {:?}, session duration: {}",
|
||||||
|
message,
|
||||||
self.address,
|
self.address,
|
||||||
crate::format_duration(&duration)
|
crate::format_duration(&duration)
|
||||||
);
|
);
|
||||||
|
|||||||
16
src/stats.rs
16
src/stats.rs
@@ -107,8 +107,20 @@ impl Collector {
|
|||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
for stats in SERVER_STATS.read().values() {
|
// Hold read lock for duration of update to retain all server stats
|
||||||
stats.address_stats().update_averages();
|
let server_stats = SERVER_STATS.read();
|
||||||
|
|
||||||
|
for stats in server_stats.values() {
|
||||||
|
if !stats.check_address_stat_average_is_updated_status() {
|
||||||
|
stats.address_stats().update_averages();
|
||||||
|
stats.address_stats().reset_current_counts();
|
||||||
|
stats.set_address_stat_average_is_updated_status(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset to false for next update
|
||||||
|
for stats in server_stats.values() {
|
||||||
|
stats.set_address_stat_average_is_updated_status(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,26 +1,29 @@
|
|||||||
use log::warn;
|
|
||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct AddressStatFields {
|
||||||
|
xact_count: Arc<AtomicU64>,
|
||||||
|
query_count: Arc<AtomicU64>,
|
||||||
|
bytes_received: Arc<AtomicU64>,
|
||||||
|
bytes_sent: Arc<AtomicU64>,
|
||||||
|
xact_time: Arc<AtomicU64>,
|
||||||
|
query_time: Arc<AtomicU64>,
|
||||||
|
wait_time: Arc<AtomicU64>,
|
||||||
|
errors: Arc<AtomicU64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Internal address stats
|
/// Internal address stats
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct AddressStats {
|
pub struct AddressStats {
|
||||||
pub total_xact_count: Arc<AtomicU64>,
|
total: AddressStatFields,
|
||||||
pub total_query_count: Arc<AtomicU64>,
|
|
||||||
pub total_received: Arc<AtomicU64>,
|
current: AddressStatFields,
|
||||||
pub total_sent: Arc<AtomicU64>,
|
|
||||||
pub total_xact_time: Arc<AtomicU64>,
|
averages: AddressStatFields,
|
||||||
pub total_query_time: Arc<AtomicU64>,
|
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
// Determines if the averages have been updated since the last time they were reported
|
||||||
pub total_errors: Arc<AtomicU64>,
|
pub averages_updated: Arc<AtomicBool>,
|
||||||
pub avg_query_count: Arc<AtomicU64>,
|
|
||||||
pub avg_query_time: Arc<AtomicU64>,
|
|
||||||
pub avg_recv: Arc<AtomicU64>,
|
|
||||||
pub avg_sent: Arc<AtomicU64>,
|
|
||||||
pub avg_errors: Arc<AtomicU64>,
|
|
||||||
pub avg_xact_time: Arc<AtomicU64>,
|
|
||||||
pub avg_xact_count: Arc<AtomicU64>,
|
|
||||||
pub avg_wait_time: Arc<AtomicU64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoIterator for AddressStats {
|
impl IntoIterator for AddressStats {
|
||||||
@@ -31,67 +34,67 @@ impl IntoIterator for AddressStats {
|
|||||||
vec![
|
vec![
|
||||||
(
|
(
|
||||||
"total_xact_count".to_string(),
|
"total_xact_count".to_string(),
|
||||||
self.total_xact_count.load(Ordering::Relaxed),
|
self.total.xact_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_query_count".to_string(),
|
"total_query_count".to_string(),
|
||||||
self.total_query_count.load(Ordering::Relaxed),
|
self.total.query_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_received".to_string(),
|
"total_received".to_string(),
|
||||||
self.total_received.load(Ordering::Relaxed),
|
self.total.bytes_received.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_sent".to_string(),
|
"total_sent".to_string(),
|
||||||
self.total_sent.load(Ordering::Relaxed),
|
self.total.bytes_sent.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_xact_time".to_string(),
|
"total_xact_time".to_string(),
|
||||||
self.total_xact_time.load(Ordering::Relaxed),
|
self.total.xact_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_query_time".to_string(),
|
"total_query_time".to_string(),
|
||||||
self.total_query_time.load(Ordering::Relaxed),
|
self.total.query_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_wait_time".to_string(),
|
"total_wait_time".to_string(),
|
||||||
self.total_wait_time.load(Ordering::Relaxed),
|
self.total.wait_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_errors".to_string(),
|
"total_errors".to_string(),
|
||||||
self.total_errors.load(Ordering::Relaxed),
|
self.total.errors.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_xact_count".to_string(),
|
"avg_xact_count".to_string(),
|
||||||
self.avg_xact_count.load(Ordering::Relaxed),
|
self.averages.xact_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_query_count".to_string(),
|
"avg_query_count".to_string(),
|
||||||
self.avg_query_count.load(Ordering::Relaxed),
|
self.averages.query_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_recv".to_string(),
|
"avg_recv".to_string(),
|
||||||
self.avg_recv.load(Ordering::Relaxed),
|
self.averages.bytes_received.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_sent".to_string(),
|
"avg_sent".to_string(),
|
||||||
self.avg_sent.load(Ordering::Relaxed),
|
self.averages.bytes_sent.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_errors".to_string(),
|
"avg_errors".to_string(),
|
||||||
self.avg_errors.load(Ordering::Relaxed),
|
self.averages.errors.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_xact_time".to_string(),
|
"avg_xact_time".to_string(),
|
||||||
self.avg_xact_time.load(Ordering::Relaxed),
|
self.averages.xact_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_query_time".to_string(),
|
"avg_query_time".to_string(),
|
||||||
self.avg_query_time.load(Ordering::Relaxed),
|
self.averages.query_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_wait_time".to_string(),
|
"avg_wait_time".to_string(),
|
||||||
self.avg_wait_time.load(Ordering::Relaxed),
|
self.averages.wait_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -99,22 +102,120 @@ impl IntoIterator for AddressStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AddressStats {
|
impl AddressStats {
|
||||||
|
pub fn xact_count_add(&self) {
|
||||||
|
self.total.xact_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.xact_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_count_add(&self) {
|
||||||
|
self.total.query_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.query_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes_received_add(&self, bytes: u64) {
|
||||||
|
self.total
|
||||||
|
.bytes_received
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
self.current
|
||||||
|
.bytes_received
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes_sent_add(&self, bytes: u64) {
|
||||||
|
self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn xact_time_add(&self, time: u64) {
|
||||||
|
self.total.xact_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.xact_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_time_add(&self, time: u64) {
|
||||||
|
self.total.query_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.query_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait_time_add(&self, time: u64) {
|
||||||
|
self.total.wait_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.wait_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn error(&self) {
|
pub fn error(&self) {
|
||||||
self.total_errors.fetch_add(1, Ordering::Relaxed);
|
self.total.errors.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.errors.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_averages(&self) {
|
pub fn update_averages(&self) {
|
||||||
let (totals, averages) = self.fields_iterators();
|
let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000;
|
||||||
for data in totals.iter().zip(averages.iter()) {
|
|
||||||
let (total, average) = data;
|
// xact_count
|
||||||
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
|
let current_xact_count = self.current.xact_count.load(Ordering::Relaxed);
|
||||||
let total = total.load(Ordering::Relaxed);
|
let current_xact_time = self.current.xact_time.load(Ordering::Relaxed);
|
||||||
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
|
self.averages.xact_count.store(
|
||||||
Some(avg)
|
current_xact_count / stat_period_per_second,
|
||||||
}) {
|
Ordering::Relaxed,
|
||||||
warn!("Could not update averages for addresses stats, {:?}", err);
|
);
|
||||||
}
|
if current_xact_count == 0 {
|
||||||
|
self.averages.xact_time.store(0, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
|
self.averages
|
||||||
|
.xact_time
|
||||||
|
.store(current_xact_time / current_xact_count, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// query_count
|
||||||
|
let current_query_count = self.current.query_count.load(Ordering::Relaxed);
|
||||||
|
let current_query_time = self.current.query_time.load(Ordering::Relaxed);
|
||||||
|
self.averages.query_count.store(
|
||||||
|
current_query_count / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
if current_query_count == 0 {
|
||||||
|
self.averages.query_time.store(0, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
|
self.averages
|
||||||
|
.query_time
|
||||||
|
.store(current_query_time / current_query_count, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// bytes_received
|
||||||
|
let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);
|
||||||
|
self.averages.bytes_received.store(
|
||||||
|
current_bytes_received / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// bytes_sent
|
||||||
|
let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed);
|
||||||
|
self.averages.bytes_sent.store(
|
||||||
|
current_bytes_sent / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// wait_time
|
||||||
|
let current_wait_time = self.current.wait_time.load(Ordering::Relaxed);
|
||||||
|
self.averages.wait_time.store(
|
||||||
|
current_wait_time / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// errors
|
||||||
|
let current_errors = self.current.errors.load(Ordering::Relaxed);
|
||||||
|
self.averages
|
||||||
|
.errors
|
||||||
|
.store(current_errors / stat_period_per_second, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reset_current_counts(&self) {
|
||||||
|
self.current.xact_count.store(0, Ordering::Relaxed);
|
||||||
|
self.current.xact_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.query_count.store(0, Ordering::Relaxed);
|
||||||
|
self.current.query_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.bytes_received.store(0, Ordering::Relaxed);
|
||||||
|
self.current.bytes_sent.store(0, Ordering::Relaxed);
|
||||||
|
self.current.wait_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.errors.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn populate_row(&self, row: &mut Vec<String>) {
|
pub fn populate_row(&self, row: &mut Vec<String>) {
|
||||||
@@ -122,28 +223,4 @@ impl AddressStats {
|
|||||||
row.push(value.to_string());
|
row.push(value.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
|
|
||||||
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
|
|
||||||
totals.push(self.total_xact_count.clone());
|
|
||||||
averages.push(self.avg_xact_count.clone());
|
|
||||||
totals.push(self.total_query_count.clone());
|
|
||||||
averages.push(self.avg_query_count.clone());
|
|
||||||
totals.push(self.total_received.clone());
|
|
||||||
averages.push(self.avg_recv.clone());
|
|
||||||
totals.push(self.total_sent.clone());
|
|
||||||
averages.push(self.avg_sent.clone());
|
|
||||||
totals.push(self.total_xact_time.clone());
|
|
||||||
averages.push(self.avg_xact_time.clone());
|
|
||||||
totals.push(self.total_query_time.clone());
|
|
||||||
averages.push(self.avg_query_time.clone());
|
|
||||||
totals.push(self.total_wait_time.clone());
|
|
||||||
averages.push(self.avg_wait_time.clone());
|
|
||||||
totals.push(self.total_errors.clone());
|
|
||||||
averages.push(self.avg_errors.clone());
|
|
||||||
|
|
||||||
(totals, averages)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,6 +139,17 @@ impl ServerStats {
|
|||||||
self.address.stats.clone()
|
self.address.stats.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn check_address_stat_average_is_updated_status(&self) -> bool {
|
||||||
|
self.address.stats.averages_updated.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_address_stat_average_is_updated_status(&self, is_checked: bool) {
|
||||||
|
self.address
|
||||||
|
.stats
|
||||||
|
.averages_updated
|
||||||
|
.store(is_checked, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
// Helper methods for show_servers
|
// Helper methods for show_servers
|
||||||
pub fn pool_name(&self) -> String {
|
pub fn pool_name(&self) -> String {
|
||||||
self.pool_stats.database()
|
self.pool_stats.database()
|
||||||
@@ -166,12 +177,9 @@ impl ServerStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
|
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
|
||||||
// Update server stats and address aggergation stats
|
// Update server stats and address aggregation stats
|
||||||
self.set_application(application_name);
|
self.set_application(application_name);
|
||||||
self.address
|
self.address.stats.wait_time_add(microseconds);
|
||||||
.stats
|
|
||||||
.total_wait_time
|
|
||||||
.fetch_add(microseconds, Ordering::Relaxed);
|
|
||||||
self.pool_stats
|
self.pool_stats
|
||||||
.maxwait
|
.maxwait
|
||||||
.fetch_max(microseconds, Ordering::Relaxed);
|
.fetch_max(microseconds, Ordering::Relaxed);
|
||||||
@@ -180,13 +188,8 @@ impl ServerStats {
|
|||||||
/// Report a query executed by a client against a server
|
/// Report a query executed by a client against a server
|
||||||
pub fn query(&self, milliseconds: u64, application_name: &str) {
|
pub fn query(&self, milliseconds: u64, application_name: &str) {
|
||||||
self.set_application(application_name.to_string());
|
self.set_application(application_name.to_string());
|
||||||
let address_stats = self.address_stats();
|
self.address.stats.query_count_add();
|
||||||
address_stats
|
self.address.stats.query_time_add(milliseconds);
|
||||||
.total_query_count
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
address_stats
|
|
||||||
.total_query_time
|
|
||||||
.fetch_add(milliseconds, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client a server
|
/// Report a transaction executed by a client a server
|
||||||
@@ -197,29 +200,20 @@ impl ServerStats {
|
|||||||
self.set_application(application_name.to_string());
|
self.set_application(application_name.to_string());
|
||||||
|
|
||||||
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.xact_count_add();
|
||||||
.stats
|
|
||||||
.total_xact_count
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server
|
/// Report data sent to a server
|
||||||
pub fn data_sent(&self, amount_bytes: usize) {
|
pub fn data_sent(&self, amount_bytes: usize) {
|
||||||
self.bytes_sent
|
self.bytes_sent
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.bytes_sent_add(amount_bytes as u64);
|
||||||
.stats
|
|
||||||
.total_sent
|
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server
|
/// Report data received from a server
|
||||||
pub fn data_received(&self, amount_bytes: usize) {
|
pub fn data_received(&self, amount_bytes: usize) {
|
||||||
self.bytes_received
|
self.bytes_received
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.bytes_received_add(amount_bytes as u64);
|
||||||
.stats
|
|
||||||
.total_received
|
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,11 +14,12 @@ describe "Admin" do
|
|||||||
describe "SHOW STATS" do
|
describe "SHOW STATS" do
|
||||||
context "clients connect and make one query" do
|
context "clients connect and make one query" do
|
||||||
it "updates *_query_time and *_wait_time" do
|
it "updates *_query_time and *_wait_time" do
|
||||||
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
|
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
connections.each do |c|
|
||||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
|
||||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
end
|
||||||
connection.close
|
sleep(1)
|
||||||
|
connections.map(&:close)
|
||||||
|
|
||||||
# wait for averages to be calculated, we shouldn't do this too often
|
# wait for averages to be calculated, we shouldn't do this too often
|
||||||
sleep(15.5)
|
sleep(15.5)
|
||||||
@@ -26,7 +27,7 @@ describe "Admin" do
|
|||||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
admin_conn.close
|
admin_conn.close
|
||||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
expect(results["avg_query_time"].to_i).to_not eq(0)
|
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
@@ -71,15 +72,17 @@ describe "Admin" do
|
|||||||
|
|
||||||
context "client connects but issues no queries" do
|
context "client connects but issues no queries" do
|
||||||
it "only affects cl_idle stats" do
|
it "only affects cl_idle stats" do
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
|
||||||
|
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
|
||||||
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
||||||
sleep(1)
|
sleep(1)
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["cl_idle"]).to eq("20")
|
expect(results["cl_idle"]).to eq("20")
|
||||||
expect(results["sv_idle"]).to eq("1")
|
expect(results["sv_idle"]).to eq(before_test)
|
||||||
|
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
sleep(1.1)
|
sleep(1.1)
|
||||||
@@ -87,7 +90,7 @@ describe "Admin" do
|
|||||||
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["sv_idle"]).to eq("1")
|
expect(results["sv_idle"]).to eq(before_test)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ module Helpers
|
|||||||
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
||||||
|
|
||||||
pgcat_cfg = pgcat.current_config
|
pgcat_cfg = pgcat.current_config
|
||||||
pgcat_cfg["general"]["query_router_plugins"] = ["intercept"]
|
|
||||||
pgcat_cfg["pools"] = {
|
pgcat_cfg["pools"] = {
|
||||||
"#{pool_name}" => {
|
"#{pool_name}" => {
|
||||||
"default_role" => "any",
|
"default_role" => "any",
|
||||||
@@ -42,7 +41,24 @@ module Helpers
|
|||||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||||
},
|
},
|
||||||
"users" => { "0" => user }
|
"users" => { "0" => user },
|
||||||
|
"plugins" => {
|
||||||
|
"intercept" => {
|
||||||
|
"enabled" => true,
|
||||||
|
"queries" => {
|
||||||
|
"0" => {
|
||||||
|
"query" => "select current_database() as a, current_schemas(false) as b",
|
||||||
|
"schema" => [
|
||||||
|
["a", "text"],
|
||||||
|
["b", "text"],
|
||||||
|
],
|
||||||
|
"result" => [
|
||||||
|
["${DATABASE}", "{public}"],
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pgcat.update_config(pgcat_cfg)
|
pgcat.update_config(pgcat_cfg)
|
||||||
@@ -102,7 +118,7 @@ module Helpers
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
|
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", pool_settings={})
|
||||||
user = {
|
user = {
|
||||||
"password" => "sharding_user",
|
"password" => "sharding_user",
|
||||||
"pool_size" => pool_size,
|
"pool_size" => pool_size,
|
||||||
@@ -118,28 +134,32 @@ module Helpers
|
|||||||
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
||||||
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
||||||
|
|
||||||
|
pool_config = {
|
||||||
|
"default_role" => "any",
|
||||||
|
"pool_mode" => pool_mode,
|
||||||
|
"load_balancing_mode" => lb_mode,
|
||||||
|
"primary_reads_enabled" => false,
|
||||||
|
"query_parser_enabled" => false,
|
||||||
|
"sharding_function" => "pg_bigint_hash",
|
||||||
|
"shards" => {
|
||||||
|
"0" => {
|
||||||
|
"database" => "shard0",
|
||||||
|
"servers" => [
|
||||||
|
["localhost", primary.port.to_s, "primary"],
|
||||||
|
["localhost", replica0.port.to_s, "replica"],
|
||||||
|
["localhost", replica1.port.to_s, "replica"],
|
||||||
|
["localhost", replica2.port.to_s, "replica"]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"users" => { "0" => user }
|
||||||
|
}
|
||||||
|
|
||||||
|
pool_config = pool_config.merge(pool_settings)
|
||||||
|
|
||||||
# Main proxy configs
|
# Main proxy configs
|
||||||
pgcat_cfg["pools"] = {
|
pgcat_cfg["pools"] = {
|
||||||
"#{pool_name}" => {
|
"#{pool_name}" => pool_config,
|
||||||
"default_role" => "any",
|
|
||||||
"pool_mode" => pool_mode,
|
|
||||||
"load_balancing_mode" => lb_mode,
|
|
||||||
"primary_reads_enabled" => false,
|
|
||||||
"query_parser_enabled" => false,
|
|
||||||
"sharding_function" => "pg_bigint_hash",
|
|
||||||
"shards" => {
|
|
||||||
"0" => {
|
|
||||||
"database" => "shard0",
|
|
||||||
"servers" => [
|
|
||||||
["localhost", primary.port.to_s, "primary"],
|
|
||||||
["localhost", replica0.port.to_s, "replica"],
|
|
||||||
["localhost", replica1.port.to_s, "replica"],
|
|
||||||
["localhost", replica2.port.to_s, "replica"]
|
|
||||||
]
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"users" => { "0" => user }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pgcat_cfg["general"]["port"] = pgcat.port
|
pgcat_cfg["general"]["port"] = pgcat.port
|
||||||
pgcat.update_config(pgcat_cfg)
|
pgcat.update_config(pgcat_cfg)
|
||||||
|
|||||||
@@ -241,6 +241,18 @@ describe "Miscellaneous" do
|
|||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "Resets server roles correctly" do
|
||||||
|
10.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("SELECT 1")
|
||||||
|
conn.async_exec("SET statement_timeout to 5000")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "transaction mode" do
|
context "transaction mode" do
|
||||||
@@ -308,6 +320,31 @@ describe "Miscellaneous" do
|
|||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "server cleanup disabled" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "random", "info", { "cleanup_server_connections" => false }) }
|
||||||
|
|
||||||
|
it "will not clean up connection state" do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
processes.primary.reset_stats
|
||||||
|
conn.async_exec("SET statement_timeout TO 1000")
|
||||||
|
conn.close
|
||||||
|
|
||||||
|
puts processes.pgcat.logs
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "will not clean up prepared statements" do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
processes.primary.reset_stats
|
||||||
|
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||||
|
|
||||||
|
conn.close
|
||||||
|
|
||||||
|
puts processes.pgcat.logs
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Idle client timeout" do
|
describe "Idle client timeout" do
|
||||||
|
|||||||
Reference in New Issue
Block a user