Compare commits

...

12 Commits

Author SHA1 Message Date
dependabot[bot]
17909d26fc chore(deps): bump socket2 from 0.4.7 to 0.5.3
Bumps [socket2](https://github.com/rust-lang/socket2) from 0.4.7 to 0.5.3.
- [Release notes](https://github.com/rust-lang/socket2/releases)
- [Changelog](https://github.com/rust-lang/socket2/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/socket2/commits)

---
updated-dependencies:
- dependency-name: socket2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-05-15 05:13:46 +00:00
Lev Kokotov
0898461c01 Allow to deploy pools without checking (#438) 2023-05-12 12:48:37 -07:00
Lev Kokotov
52b1b43850 Prewarmer (#435)
* Prewarmer

* hmm

* Tests

* default

* fix test

* Correct configuration

* Added minimal config example

* remove connect_timeout
2023-05-12 09:50:52 -07:00
Zain Kabani
0907f1b77f Improve logging for connection cleanup (#428)
* initial commit

* fix

* fmt
2023-05-11 17:40:10 -07:00
Zain Kabani
73260690b0 Fixes average stats bug (#436)
* Add test

* Fix test

* Add fix
2023-05-11 17:37:58 -07:00
Mostafa Abdelraouf
5056cbe8ed Fix docker-compose dev stack for Apple silicon (#432)
The docker-compose dev setup is broken under Apple silicon, starting the stack fails with the following error. Switching to a different docker image fixes the issue.
2023-05-10 10:24:35 -05:00
Lev Kokotov
571b02e178 Calculate averages correctly and preserve totals like before (#429)
* Reset totals after avg calculation

* like it used to be
2023-05-08 10:06:16 -07:00
Andrew Tanner
159eb89bf0 First try with role reset (#427)
* First try with role rest

* update

* extra line

* Update src/server.rs

Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>

* Update tests/ruby/misc_spec.rb

Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>

---------

Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2023-05-05 15:31:27 -07:00
Lev Kokotov
389993bf3e Accurate log messages (#425) 2023-05-05 08:27:19 -07:00
Lev Kokotov
ba5243b6dd Optionally validate config on boot (#423) 2023-05-03 17:07:23 -07:00
Lev Kokotov
128ef72911 lowercase config query (#422)
* lowercase config query

* remove debug
2023-05-03 16:47:20 -07:00
Lev Kokotov
811885f464 Actually plugins (#421)
* more plugins

* clean up

* fix tests

* fix flakey test
2023-05-03 16:13:45 -07:00
21 changed files with 783 additions and 308 deletions

138
Cargo.lock generated
View File

@@ -250,6 +250,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "either"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]]
name = "enum-as-inner"
version = "0.5.1"
@@ -556,7 +562,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.4.9",
"tokio",
"tower-service",
"tracing",
@@ -625,7 +631,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -634,7 +640,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd302af1b90f2463a98fa5ad469fc212c8e3175a41c3068601bfa2727591c5be"
dependencies = [
"socket2",
"socket2 0.4.9",
"widestring",
"winapi",
"winreg",
@@ -655,7 +661,16 @@ dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
name = "itertools"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
[[package]]
@@ -701,9 +716,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.139"
version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "link-cplusplus"
@@ -799,7 +814,7 @@ dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -871,7 +886,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -882,7 +897,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pgcat"
version = "1.0.2-alpha1"
version = "1.0.2-alpha3"
dependencies = [
"arc-swap",
"async-trait",
@@ -897,6 +912,7 @@ dependencies = [
"futures",
"hmac",
"hyper",
"itertools",
"jemallocator",
"log",
"md-5",
@@ -916,7 +932,7 @@ dependencies = [
"serde_json",
"sha-1",
"sha2",
"socket2",
"socket2 0.5.3",
"sqlparser",
"stringprep",
"tokio",
@@ -1141,7 +1157,7 @@ dependencies = [
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -1297,14 +1313,24 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.7"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "spin"
version = "0.5.2"
@@ -1446,9 +1472,9 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.4.9",
"tokio-macros",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -1827,7 +1853,16 @@ version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
"windows-targets 0.42.1",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
]
[[package]]
@@ -1836,13 +1871,28 @@ version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
"windows_aarch64_gnullvm 0.42.1",
"windows_aarch64_msvc 0.42.1",
"windows_i686_gnu 0.42.1",
"windows_i686_msvc 0.42.1",
"windows_x86_64_gnu 0.42.1",
"windows_x86_64_gnullvm 0.42.1",
"windows_x86_64_msvc 0.42.1",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm 0.48.0",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm 0.48.0",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
@@ -1851,42 +1901,84 @@ version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.3.3"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.0.2-alpha1"
version = "1.0.2-alpha3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -34,7 +34,7 @@ hyper = { version = "0.14", features = ["full"] }
phf = { version = "0.11.1", features = ["macros"] }
exitcode = "1.1.2"
futures = "0.3"
socket2 = { version = "0.4.7", features = ["all"] }
socket2 = { version = "0.5.3", features = ["all"] }
nix = "0.26.2"
atomic_enum = "0.2.0"
postgres-protocol = "0.6.5"
@@ -45,6 +45,8 @@ rustls = { version = "0.21", features = ["dangerous_configuration"] }
trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
itertools = "0.10"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -25,7 +25,7 @@ x-common-env-pg:
services:
main:
image: kubernetes/pause
image: gcr.io/google_containers/pause:3.2
ports:
- 6432
@@ -64,7 +64,7 @@ services:
<<: *common-env-pg
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
PGPORT: 10432
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
command: ["postgres", "-p", "10432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
toxiproxy:
build: .

22
pgcat.minimal.toml Normal file
View File

@@ -0,0 +1,22 @@
# This is an example of the most basic config
# that will mimic what PgBouncer does in transaction mode with one server.
[general]
host = "0.0.0.0"
port = 6433
admin_username = "pgcat"
admin_password = "pgcat"
[pools.pgml.users.0]
username = "postgres"
password = "postgres"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.pgml.shards.0]
servers = [
["127.0.0.1", 28815, "primary"]
]
database = "postgres"

View File

@@ -77,8 +77,57 @@ admin_username = "admin_user"
# Password to access the virtual administrative database
admin_password = "admin_pass"
# Plugins!!
# query_router_plugins = ["pg_table_access", "intercept"]
# Default plugins that are configured on all pools.
[plugins]
# Prewarmer plugin that runs queries on server startup, before giving the connection
# to the client.
[plugins.prewarmer]
enabled = false
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]
# Log all queries to stdout.
[plugins.query_logger]
enabled = false
# Block access to tables that Postgres does not allow us to control.
[plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]
# Intercept user queries and give a fake reply.
[plugins.intercept]
enabled = true
[plugins.intercept.queries.0]
query = "select current_database() as a, current_schemas(false) as b"
schema = [
["a", "text"],
["b", "text"],
]
result = [
["${DATABASE}", "{public}"],
]
[plugins.intercept.queries.1]
query = "select current_database(), current_schema(), current_user"
schema = [
["current_database", "text"],
["current_schema", "text"],
["current_user", "text"],
]
result = [
["${DATABASE}", "public", "${USER}"],
]
# pool configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting.
@@ -157,6 +206,53 @@ connect_timeout = 3000
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
# dns_max_ttl = 30
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
# so all plugins have to be configured here again.
[pool.sharded_db.plugins]
[pools.sharded_db.plugins.prewarmer]
enabled = true
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]
[pools.sharded_db.plugins.query_logger]
enabled = false
[pools.sharded_db.plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]
[pools.sharded_db.plugins.intercept]
enabled = true
[pools.sharded_db.plugins.intercept.queries.0]
query = "select current_database() as a, current_schemas(false) as b"
schema = [
["a", "text"],
["b", "text"],
]
result = [
["${DATABASE}", "{public}"],
]
[pools.sharded_db.plugins.intercept.queries.1]
query = "select current_database(), current_schema(), current_user"
schema = [
["current_database", "text"],
["current_schema", "text"],
["current_user", "text"],
]
result = [
["${DATABASE}", "public", "${USER}"],
]
# User configs are structured as pool.<pool_name>.users.<user_index>
# This section holds the credentials for users that may connect to this cluster
[pools.sharded_db.users.0]

View File

@@ -122,6 +122,16 @@ impl Default for Address {
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"[address: {}:{}][database: {}][user: {}]",
self.host, self.port, self.database, self.username
)
}
}
// We need to implement PartialEq by ourselves so we skip stats in the comparison
impl PartialEq for Address {
fn eq(&self, other: &Self) -> bool {
@@ -235,6 +245,8 @@ pub struct General {
pub port: u16,
pub enable_prometheus_exporter: Option<bool>,
#[serde(default = "General::default_prometheus_exporter_port")]
pub prometheus_exporter_port: i16,
#[serde(default = "General::default_connect_timeout")]
@@ -298,12 +310,13 @@ pub struct General {
pub admin_username: String,
pub admin_password: String,
#[serde(default = "General::default_validate_config")]
pub validate_config: bool,
// Support for auth query
pub auth_query: Option<String>,
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
pub query_router_plugins: Option<Vec<String>>,
}
impl General {
@@ -369,6 +382,14 @@ impl General {
pub fn default_idle_client_in_transaction_timeout() -> u64 {
0
}
pub fn default_validate_config() -> bool {
true
}
pub fn default_prometheus_exporter_port() -> i16 {
9930
}
}
impl Default for General {
@@ -404,7 +425,7 @@ impl Default for General {
auth_query_user: None,
auth_query_password: None,
server_lifetime: 1000 * 3600 * 24, // 24 hours,
query_router_plugins: None,
validate_config: true,
}
}
}
@@ -457,6 +478,7 @@ pub struct Pool {
#[serde(default = "Pool::default_load_balancing_mode")]
pub load_balancing_mode: LoadBalancingMode,
#[serde(default = "Pool::default_default_role")]
pub default_role: String,
#[serde(default)] // False
@@ -471,6 +493,7 @@ pub struct Pool {
pub server_lifetime: Option<u64>,
#[serde(default = "Pool::default_sharding_function")]
pub sharding_function: ShardingFunction,
#[serde(default = "Pool::default_automatic_sharding_key")]
@@ -484,6 +507,7 @@ pub struct Pool {
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
@@ -516,6 +540,14 @@ impl Pool {
None
}
pub fn default_default_role() -> String {
"any".into()
}
pub fn default_sharding_function() -> ShardingFunction {
ShardingFunction::PgBigintHash
}
pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
@@ -604,6 +636,7 @@ impl Default for Pool {
auth_query_user: None,
auth_query_password: None,
server_lifetime: None,
plugins: None,
}
}
}
@@ -682,6 +715,76 @@ impl Default for Shard {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Plugins {
pub intercept: Option<Intercept>,
pub table_access: Option<TableAccess>,
pub query_logger: Option<QueryLogger>,
pub prewarmer: Option<Prewarmer>,
}
impl std::fmt::Display for Plugins {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
self.intercept.is_some(),
self.table_access.is_some(),
self.query_logger.is_some(),
self.prewarmer.is_some(),
)
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Intercept {
pub enabled: bool,
pub queries: BTreeMap<String, Query>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct TableAccess {
pub enabled: bool,
pub tables: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct QueryLogger {
pub enabled: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Prewarmer {
pub enabled: bool,
pub queries: Vec<String>,
}
impl Intercept {
pub fn substitute(&mut self, db: &str, user: &str) {
for (_, query) in self.queries.iter_mut() {
query.substitute(db, user);
query.query = query.query.to_ascii_lowercase();
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Query {
pub query: String,
pub schema: Vec<Vec<String>>,
pub result: Vec<Vec<String>>,
}
impl Query {
pub fn substitute(&mut self, db: &str, user: &str) {
for col in self.result.iter_mut() {
for i in 0..col.len() {
col[i] = col[i].replace("${USER}", user).replace("${DATABASE}", db);
}
}
}
}
/// Configuration wrapper.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Config {
@@ -699,7 +802,13 @@ pub struct Config {
#[serde(default = "Config::default_path")]
pub path: String,
// General and global settings.
pub general: General,
// Plugins that should run in all pools.
pub plugins: Option<Plugins>,
// Connection pools.
pub pools: HashMap<String, Pool>,
}
@@ -737,6 +846,7 @@ impl Default for Config {
path: Self::default_path(),
general: General::default(),
pools: HashMap::default(),
plugins: None,
}
}
}
@@ -883,6 +993,13 @@ impl Config {
"Server TLS certificate verification: {}",
self.general.verify_server_certificate
);
info!(
"Plugins: {}",
match self.plugins {
Some(ref plugins) => plugins.to_string(),
None => "not configured".into(),
}
);
for (pool_name, pool_config) in &self.pools {
// TODO: Make this output prettier (maybe a table?)
@@ -949,6 +1066,14 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}] Plugins: {}",
pool_name,
match pool_config.plugins {
Some(ref plugins) => plugins.to_string(),
None => "not configured".into(),
}
);
for user in &pool_config.users {
info!(
@@ -1128,6 +1253,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
let old_config = get_config();
match parse(&old_config.path).await {
Ok(()) => (),
Err(err) => {
@@ -1135,18 +1261,18 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
return Err(Error::BadConfig);
}
};
let new_config = get_config();
match CachedResolver::from_config().await {
Ok(_) => (),
Err(err) => error!("DNS cache reinitialization error: {:?}", err),
};
if old_config.pools != new_config.pools {
info!("Pool configuration changed");
if old_config != new_config {
info!("Config changed, reloading");
ConnectionPool::from_config(client_server_map).await?;
Ok(true)
} else if old_config != new_config {
Ok(true)
} else {
Ok(false)
}

View File

@@ -43,6 +43,7 @@ impl MirroredClient {
ClientServerMap::default(),
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
);
Pool::builder()

View File

@@ -2,41 +2,21 @@
//!
//! It intercepts queries and returns fake results.
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sqlparser::ast::Statement;
use std::collections::HashMap;
use log::debug;
use std::sync::Arc;
use crate::{
config::Intercept as InterceptConfig,
errors::Error,
messages::{command_complete, data_row_nullable, row_description, DataType},
plugins::{Plugin, PluginOutput},
pool::{PoolIdentifier, PoolMap},
query_router::QueryRouter,
};
pub static CONFIG: Lazy<ArcSwap<HashMap<PoolIdentifier, Value>>> =
Lazy::new(|| ArcSwap::from_pointee(HashMap::new()));
/// Configure the intercept plugin.
pub fn configure(pools: &PoolMap) {
let mut config = HashMap::new();
for (identifier, _) in pools.iter() {
// TODO: make this configurable from a text config.
let value = fool_datagrip(&identifier.db, &identifier.user);
config.insert(identifier.clone(), value);
}
CONFIG.store(Arc::new(config));
}
// TODO: use these structs for deserialization
#[derive(Serialize, Deserialize)]
pub struct Rule {
@@ -52,45 +32,47 @@ pub struct Column {
}
/// The intercept plugin.
pub struct Intercept;
pub struct Intercept<'a> {
pub enabled: bool,
pub config: &'a InterceptConfig,
}
#[async_trait]
impl Plugin for Intercept {
impl<'a> Plugin for Intercept<'a> {
async fn run(
&mut self,
query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if ast.is_empty() {
if !self.enabled || ast.is_empty() {
return Ok(PluginOutput::Allow);
}
let mut result = BytesMut::new();
let query_map = match CONFIG.load().get(&PoolIdentifier::new(
let mut config = self.config.clone();
config.substitute(
&query_router.pool_settings().db,
&query_router.pool_settings().user.username,
)) {
Some(query_map) => query_map.clone(),
None => return Ok(PluginOutput::Allow),
};
);
let mut result = BytesMut::new();
for q in ast {
// Normalization
let q = q.to_string().to_ascii_lowercase();
for target in query_map.as_array().unwrap().iter() {
if target["query"].as_str().unwrap() == q {
debug!("Query matched: {}", q);
for (_, target) in config.queries.iter() {
if target.query.as_str() == q {
debug!("Intercepting query: {}", q);
let rd = target["schema"]
.as_array()
.unwrap()
let rd = target
.schema
.iter()
.map(|row| {
let row = row.as_object().unwrap();
let name = &row[0];
let data_type = &row[1];
(
row["name"].as_str().unwrap(),
match row["data_type"].as_str().unwrap() {
name.as_str(),
match data_type.as_str() {
"text" => DataType::Text,
"anyarray" => DataType::AnyArray,
"oid" => DataType::Oid,
@@ -104,13 +86,11 @@ impl Plugin for Intercept {
result.put(row_description(&rd));
target["result"].as_array().unwrap().iter().for_each(|row| {
target.result.iter().for_each(|row| {
let row = row
.as_array()
.unwrap()
.iter()
.map(|s| {
let s = s.as_str().unwrap().to_string();
let s = s.as_str().to_string();
if s == "" {
None
@@ -138,141 +118,3 @@ impl Plugin for Intercept {
}
}
}
/// Make IntelliJ SQL plugin believe it's talking to an actual database
/// instead of PgCat.
fn fool_datagrip(database: &str, user: &str) -> Value {
json!([
{
"query": "select current_database() as a, current_schemas(false) as b",
"schema": [
{
"name": "a",
"data_type": "text",
},
{
"name": "b",
"data_type": "anyarray",
},
],
"result": [
[database, "{public}"],
],
},
{
"query": "select current_database(), current_schema(), current_user",
"schema": [
{
"name": "current_database",
"data_type": "text",
},
{
"name": "current_schema",
"data_type": "text",
},
{
"name": "current_user",
"data_type": "text",
}
],
"result": [
["sharded_db", "public", "sharding_user"],
],
},
{
"query": "select cast(n.oid as bigint) as id, datname as name, d.description, datistemplate as is_template, datallowconn as allow_connections, pg_catalog.pg_get_userbyid(n.datdba) as \"owner\" from pg_catalog.pg_database as n left join pg_catalog.pg_shdescription as d on n.oid = d.objoid order by case when datname = pg_catalog.current_database() then -cast(1 as bigint) else cast(n.oid as bigint) end",
"schema": [
{
"name": "id",
"data_type": "oid",
},
{
"name": "name",
"data_type": "text",
},
{
"name": "description",
"data_type": "text",
},
{
"name": "is_template",
"data_type": "bool",
},
{
"name": "allow_connections",
"data_type": "bool",
},
{
"name": "owner",
"data_type": "text",
}
],
"result": [
["16387", database, "", "f", "t", user],
]
},
{
"query": "select cast(r.oid as bigint) as role_id, rolname as role_name, rolsuper as is_super, rolinherit as is_inherit, rolcreaterole as can_createrole, rolcreatedb as can_createdb, rolcanlogin as can_login, rolreplication as is_replication, rolconnlimit as conn_limit, rolvaliduntil as valid_until, rolbypassrls as bypass_rls, rolconfig as config, d.description from pg_catalog.pg_roles as r left join pg_catalog.pg_shdescription as d on d.objoid = r.oid",
"schema": [
{
"name": "role_id",
"data_type": "oid",
},
{
"name": "role_name",
"data_type": "text",
},
{
"name": "is_super",
"data_type": "bool",
},
{
"name": "is_inherit",
"data_type": "bool",
},
{
"name": "can_createrole",
"data_type": "bool",
},
{
"name": "can_createdb",
"data_type": "bool",
},
{
"name": "can_login",
"data_type": "bool",
},
{
"name": "is_replication",
"data_type": "bool",
},
{
"name": "conn_limit",
"data_type": "int4",
},
{
"name": "valid_until",
"data_type": "text",
},
{
"name": "bypass_rls",
"data_type": "bool",
},
{
"name": "config",
"data_type": "text",
},
{
"name": "description",
"data_type": "text",
},
],
"result": [
["10", "postgres", "f", "t", "f", "f", "t", "f", "-1", "", "f", "", ""],
["16419", user, "f", "t", "f", "f", "t", "f", "-1", "", "f", "", ""],
]
}
])
}

View File

@@ -9,6 +9,8 @@
//!
pub mod intercept;
pub mod prewarmer;
pub mod query_logger;
pub mod table_access;
use crate::{errors::Error, query_router::QueryRouter};
@@ -17,6 +19,7 @@ use bytes::BytesMut;
use sqlparser::ast::Statement;
pub use intercept::Intercept;
pub use query_logger::QueryLogger;
pub use table_access::TableAccess;
#[derive(Clone, Debug, PartialEq)]
@@ -29,12 +32,13 @@ pub enum PluginOutput {
#[async_trait]
pub trait Plugin {
// Custom output is allowed because we want to extend this system
// to rewriting queries some day. So an output of a plugin could be
// a rewritten AST.
// Run before the query is sent to the server.
async fn run(
&mut self,
query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error>;
// TODO: run after the result is returned
// async fn callback(&mut self, query_router: &QueryRouter);
}

28
src/plugins/prewarmer.rs Normal file
View File

@@ -0,0 +1,28 @@
//! Prewarm new connections before giving them to the client.
use crate::{errors::Error, server::Server};
use log::info;
pub struct Prewarmer<'a> {
pub enabled: bool,
pub server: &'a mut Server,
pub queries: &'a Vec<String>,
}
impl<'a> Prewarmer<'a> {
pub async fn run(&mut self) -> Result<(), Error> {
if !self.enabled {
return Ok(());
}
for query in self.queries {
info!(
"{} Prewarning with query: `{}`",
self.server.address(),
query
);
self.server.query(&query).await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,38 @@
//! Log all queries to stdout (or somewhere else, why not).
use crate::{
errors::Error,
plugins::{Plugin, PluginOutput},
query_router::QueryRouter,
};
use async_trait::async_trait;
use log::info;
use sqlparser::ast::Statement;
pub struct QueryLogger<'a> {
pub enabled: bool,
pub user: &'a str,
pub db: &'a str,
}
#[async_trait]
impl<'a> Plugin for QueryLogger<'a> {
async fn run(
&mut self,
_query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if !self.enabled {
return Ok(PluginOutput::Allow);
}
let query = ast
.iter()
.map(|q| q.to_string())
.collect::<Vec<String>>()
.join("; ");
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
Ok(PluginOutput::Allow)
}
}

View File

@@ -10,19 +10,26 @@ use crate::{
query_router::QueryRouter,
};
use log::debug;
use core::ops::ControlFlow;
pub struct TableAccess {
pub forbidden_tables: Vec<String>,
pub struct TableAccess<'a> {
pub enabled: bool,
pub tables: &'a Vec<String>,
}
#[async_trait]
impl Plugin for TableAccess {
impl<'a> Plugin for TableAccess<'a> {
async fn run(
&mut self,
_query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if !self.enabled {
return Ok(PluginOutput::Allow);
}
let mut found = None;
visit_relations(ast, |relation| {
@@ -30,7 +37,7 @@ impl Plugin for TableAccess {
let parts = relation.split(".").collect::<Vec<&str>>();
let table_name = parts.last().unwrap();
if self.forbidden_tables.contains(&table_name.to_string()) {
if self.tables.contains(&table_name.to_string()) {
found = Some(table_name.to_string());
ControlFlow::<()>::Break(())
} else {
@@ -39,6 +46,8 @@ impl Plugin for TableAccess {
});
if let Some(found) = found {
debug!("Blocking access to table \"{}\"", found);
Ok(PluginOutput::Deny(format!(
"permission for table \"{}\" denied",
found

View File

@@ -17,10 +17,13 @@ use std::sync::{
use std::time::Instant;
use tokio::sync::Notify;
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
use crate::config::{
get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
};
use crate::errors::Error;
use crate::auth_passthrough::AuthPassthrough;
use crate::plugins::prewarmer;
use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
@@ -133,7 +136,8 @@ pub struct PoolSettings {
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
pub plugins: Option<Vec<String>>,
/// Plugins
pub plugins: Option<Plugins>,
}
impl Default for PoolSettings {
@@ -198,6 +202,7 @@ pub struct ConnectionPool {
paused: Arc<AtomicBool>,
paused_waiter: Arc<Notify>,
/// Statistics.
pub stats: Arc<PoolStats>,
/// AuthInfo
@@ -355,6 +360,10 @@ impl ConnectionPool {
client_server_map.clone(),
pool_stats.clone(),
pool_auth_hash.clone(),
match pool_config.plugins {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
);
let connect_timeout = match pool_config.connect_timeout {
@@ -380,7 +389,10 @@ impl ConnectionPool {
.min()
.unwrap();
debug!("Pool reaper rate: {}ms", reaper_rate);
debug!(
"[pool: {}][user: {}] Pool reaper rate: {}ms",
pool_name, user.username, reaper_rate
);
let pool = Pool::builder()
.max_size(user.pool_size)
@@ -389,9 +401,13 @@ impl ConnectionPool {
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
.test_on_check_out(false)
.build(manager)
.await?;
.test_on_check_out(false);
let pool = if config.general.validate_config {
pool.build(manager).await?
} else {
pool.build_unchecked(manager)
};
pools.push(pool);
servers.push(address);
@@ -453,7 +469,10 @@ impl ConnectionPool {
auth_query: pool_config.auth_query.clone(),
auth_query_user: pool_config.auth_query_user.clone(),
auth_query_password: pool_config.auth_query_password.clone(),
plugins: config.general.query_router_plugins.clone(),
plugins: match pool_config.plugins {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
},
validated: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
@@ -463,23 +482,18 @@ impl ConnectionPool {
// Connect to the servers to make sure pool configuration is valid
// before setting it globally.
// Do this async and somewhere else, we don't have to wait here.
let mut validate_pool = pool.clone();
tokio::task::spawn(async move {
let _ = validate_pool.validate().await;
});
if config.general.validate_config {
let mut validate_pool = pool.clone();
tokio::task::spawn(async move {
let _ = validate_pool.validate().await;
});
}
// There is one pool per database/user pair.
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
}
}
// Initialize plugins here if required.
if let Some(plugins) = config.general.query_router_plugins {
if plugins.contains(&String::from("intercept")) {
crate::plugins::intercept::configure(&new_pools);
}
}
POOLS.store(Arc::new(new_pools.clone()));
Ok(())
}
@@ -624,7 +638,10 @@ impl ConnectionPool {
{
Ok(conn) => conn,
Err(err) => {
error!("Banning instance {:?}, error: {:?}", address, err);
error!(
"Connection checkout error for instance {:?}, error: {:?}",
address, err
);
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
address.stats.error();
client_stats.idle();
@@ -700,7 +717,7 @@ impl ConnectionPool {
// Health check failed.
Err(err) => {
error!(
"Banning instance {:?} because of failed health check, {:?}",
"Failed health check on instance {:?}, error: {:?}",
address, err
);
}
@@ -709,7 +726,7 @@ impl ConnectionPool {
// Health check timed out.
Err(err) => {
error!(
"Banning instance {:?} because of health check timeout, {:?}",
"Health check timeout on instance {:?}, error: {:?}",
address, err
);
}
@@ -731,13 +748,16 @@ impl ConnectionPool {
return;
}
error!("Banning instance {:?}, reason: {:?}", address, reason);
let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.write();
error!("Banning {:?}", address);
if let Some(client_info) = client_info {
client_info.ban_error();
address.stats.error();
}
guard[address.shard].insert(address.clone(), (reason, now));
}
@@ -900,6 +920,7 @@ pub struct ServerPool {
client_server_map: ClientServerMap,
stats: Arc<PoolStats>,
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
}
impl ServerPool {
@@ -910,6 +931,7 @@ impl ServerPool {
client_server_map: ClientServerMap,
stats: Arc<PoolStats>,
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
) -> ServerPool {
ServerPool {
address,
@@ -918,6 +940,7 @@ impl ServerPool {
client_server_map,
stats,
auth_hash,
plugins,
}
}
}
@@ -950,7 +973,19 @@ impl ManageConnection for ServerPool {
)
.await
{
Ok(conn) => {
Ok(mut conn) => {
if let Some(ref plugins) = self.plugins {
if let Some(ref prewarmer) = plugins.prewarmer {
let mut prewarmer = prewarmer::Prewarmer {
enabled: prewarmer.enabled,
server: &mut conn,
queries: &prewarmer.queries,
};
prewarmer.run().await?;
}
}
stats.idle();
Ok(conn)
}

View File

@@ -15,7 +15,7 @@ use sqlparser::parser::Parser;
use crate::config::Role;
use crate::errors::Error;
use crate::messages::BytesMutReader;
use crate::plugins::{Intercept, Plugin, PluginOutput, TableAccess};
use crate::plugins::{Intercept, Plugin, PluginOutput, QueryLogger, TableAccess};
use crate::pool::PoolSettings;
use crate::sharding::Sharder;
@@ -790,24 +790,44 @@ impl QueryRouter {
/// Add your plugins here and execute them.
pub async fn execute_plugins(&self, ast: &Vec<Statement>) -> Result<PluginOutput, Error> {
if let Some(plugins) = &self.pool_settings.plugins {
if plugins.contains(&String::from("intercept")) {
let mut intercept = Intercept {};
let result = intercept.run(&self, ast).await;
let plugins = match self.pool_settings.plugins {
Some(ref plugins) => plugins,
None => return Ok(PluginOutput::Allow),
};
if let Ok(PluginOutput::Intercept(output)) = result {
return Ok(PluginOutput::Intercept(output));
}
if let Some(ref query_logger) = plugins.query_logger {
let mut query_logger = QueryLogger {
enabled: query_logger.enabled,
user: &self.pool_settings.user.username,
db: &self.pool_settings.db,
};
let _ = query_logger.run(&self, ast).await;
}
if let Some(ref intercept) = plugins.intercept {
let mut intercept = Intercept {
enabled: intercept.enabled,
config: &intercept,
};
let result = intercept.run(&self, ast).await;
if let Ok(PluginOutput::Intercept(output)) = result {
return Ok(PluginOutput::Intercept(output));
}
}
if plugins.contains(&String::from("pg_table_access")) {
let mut table_access = TableAccess {
forbidden_tables: vec![String::from("pg_database"), String::from("pg_roles")],
};
if let Some(ref table_access) = plugins.table_access {
let mut table_access = TableAccess {
enabled: table_access.enabled,
tables: &table_access.tables,
};
if let Ok(PluginOutput::Deny(error)) = table_access.run(&self, ast).await {
return Ok(PluginOutput::Deny(error));
}
let result = table_access.run(&self, ast).await;
if let Ok(PluginOutput::Deny(error)) = result {
return Ok(PluginOutput::Deny(error));
}
}
@@ -1233,6 +1253,7 @@ mod test {
db: "test".to_string(),
plugins: None,
};
let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings.clone());
@@ -1376,12 +1397,24 @@ mod test {
#[tokio::test]
async fn test_table_access_plugin() {
use crate::config::{Plugins, TableAccess};
let table_access = TableAccess {
enabled: true,
tables: vec![String::from("pg_database")],
};
let plugins = Plugins {
table_access: Some(table_access),
intercept: None,
query_logger: None,
prewarmer: None,
};
QueryRouter::setup();
let mut pool_settings = PoolSettings::default();
pool_settings.query_parser_enabled = true;
pool_settings.plugins = Some(plugins);
let mut qr = QueryRouter::new();
let mut pool_settings = PoolSettings::default();
pool_settings.plugins = Some(vec![String::from("pg_table_access")]);
qr.update_pool_settings(pool_settings);
let query = simple_query("SELECT * FROM pg_database");
@@ -1396,4 +1429,17 @@ mod test {
))
);
}
#[tokio::test]
async fn test_plugins_disabled_by_defaault() {
QueryRouter::setup();
let qr = QueryRouter::new();
let query = simple_query("SELECT * FROM pg_database");
let ast = QueryRouter::parse(&query).unwrap();
let res = qr.execute_plugins(&ast).await;
assert_eq!(res, Ok(PluginOutput::Allow));
}
}

View File

@@ -103,6 +103,48 @@ impl StreamInner {
}
}
#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement
needs_cleanup_set: bool,
/// If server connection requires DISCARD ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}
impl CleanupState {
fn new() -> Self {
CleanupState {
needs_cleanup_set: false,
needs_cleanup_prepare: false,
}
}
fn needs_cleanup(&self) -> bool {
self.needs_cleanup_set || self.needs_cleanup_prepare
}
fn set_true(&mut self) {
self.needs_cleanup_set = true;
self.needs_cleanup_prepare = true;
}
fn reset(&mut self) {
self.needs_cleanup_set = false;
self.needs_cleanup_prepare = false;
}
}
impl std::fmt::Display for CleanupState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SET: {}, PREPARE: {}",
self.needs_cleanup_set, self.needs_cleanup_prepare
)
}
}
/// Server state.
pub struct Server {
/// Server host, e.g. localhost,
@@ -131,8 +173,8 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
/// If server connection requires a DISCARD ALL before checkin
needs_cleanup: bool,
/// If server connection requires DISCARD ALL before checkin
cleanup_state: CleanupState,
/// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap,
@@ -630,7 +672,7 @@ impl Server {
in_transaction: false,
data_available: false,
bad: false,
needs_cleanup: false,
cleanup_state: CleanupState::new(),
client_server_map,
addr_set,
connected_at: chrono::offset::Utc::now().naive_utc(),
@@ -705,7 +747,10 @@ impl Server {
Ok(())
}
Err(err) => {
error!("Terminating server because of: {:?}", err);
error!(
"Terminating server {:?} because of: {:?}",
self.address, err
);
self.bad = true;
Err(err)
}
@@ -720,7 +765,10 @@ impl Server {
let mut message = match read_message(&mut self.stream).await {
Ok(message) => message,
Err(err) => {
error!("Terminating server because of: {:?}", err);
error!(
"Terminating server {:?} because of: {:?}",
self.address, err
);
self.bad = true;
return Err(err);
}
@@ -787,12 +835,12 @@ impl Server {
// This will reduce amount of discard statements sent
if !self.in_transaction {
debug!("Server connection marked for clean up");
self.needs_cleanup = true;
self.cleanup_state.needs_cleanup_set = true;
}
}
"PREPARE\0" => {
debug!("Server connection marked for clean up");
self.needs_cleanup = true;
self.cleanup_state.needs_cleanup_prepare = true;
}
_ => (),
}
@@ -922,6 +970,8 @@ impl Server {
/// It will use the simple query protocol.
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
debug!("Running `{}` on server {:?}", query, self.address);
let query = simple_query(query);
self.send(&query).await?;
@@ -954,10 +1004,11 @@ impl Server {
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.needs_cleanup {
warn!("Server returned with session state altered, discarding state");
if self.cleanup_state.needs_cleanup() {
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.needs_cleanup = false;
self.query("RESET ROLE").await?;
self.cleanup_state.reset();
}
Ok(())
@@ -969,12 +1020,12 @@ impl Server {
self.application_name = name.to_string();
// We don't want `SET application_name` to mark the server connection
// as needing cleanup
let needs_cleanup_before = self.needs_cleanup;
let needs_cleanup_before = self.cleanup_state;
let result = Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?);
self.needs_cleanup = needs_cleanup_before;
self.cleanup_state = needs_cleanup_before;
result
} else {
Ok(())
@@ -999,7 +1050,7 @@ impl Server {
// Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty(&mut self) {
self.needs_cleanup = true;
self.cleanup_state.set_true();
}
pub fn mirror_send(&mut self, bytes: &BytesMut) {
@@ -1135,14 +1186,18 @@ impl Drop for Server {
_ => debug!("Dirty shutdown"),
};
// Should not matter.
self.bad = true;
let now = chrono::offset::Utc::now().naive_utc();
let duration = now - self.connected_at;
let message = if self.bad {
"Server connection terminated"
} else {
"Server connection closed"
};
info!(
"Server connection closed {:?}, session duration: {}",
"{} {:?}, session duration: {}",
message,
self.address,
crate::format_duration(&duration)
);

View File

@@ -107,8 +107,19 @@ impl Collector {
loop {
interval.tick().await;
for stats in SERVER_STATS.read().values() {
stats.address_stats().update_averages();
// Hold read lock for duration of update to retain all server stats
let server_stats = SERVER_STATS.read();
for stats in server_stats.values() {
if !stats.check_address_stat_average_is_updated_status() {
stats.address_stats().update_averages();
stats.set_address_stat_average_is_updated_status(true);
}
}
// Reset to false for next update
for stats in server_stats.values() {
stats.set_address_stat_average_is_updated_status(false);
}
}
});

View File

@@ -1,4 +1,3 @@
use log::warn;
use std::sync::atomic::*;
use std::sync::Arc;
@@ -13,6 +12,16 @@ pub struct AddressStats {
pub total_query_time: Arc<AtomicU64>,
pub total_wait_time: Arc<AtomicU64>,
pub total_errors: Arc<AtomicU64>,
pub old_total_xact_count: Arc<AtomicU64>,
pub old_total_query_count: Arc<AtomicU64>,
pub old_total_received: Arc<AtomicU64>,
pub old_total_sent: Arc<AtomicU64>,
pub old_total_xact_time: Arc<AtomicU64>,
pub old_total_query_time: Arc<AtomicU64>,
pub old_total_wait_time: Arc<AtomicU64>,
pub old_total_errors: Arc<AtomicU64>,
pub avg_query_count: Arc<AtomicU64>,
pub avg_query_time: Arc<AtomicU64>,
pub avg_recv: Arc<AtomicU64>,
@@ -21,6 +30,9 @@ pub struct AddressStats {
pub avg_xact_time: Arc<AtomicU64>,
pub avg_xact_count: Arc<AtomicU64>,
pub avg_wait_time: Arc<AtomicU64>,
// Determines if the averages have been updated since the last time they were reported
pub averages_updated: Arc<AtomicBool>,
}
impl IntoIterator for AddressStats {
@@ -104,16 +116,15 @@ impl AddressStats {
}
pub fn update_averages(&self) {
let (totals, averages) = self.fields_iterators();
for data in totals.iter().zip(averages.iter()) {
let (total, average) = data;
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
let total = total.load(Ordering::Relaxed);
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
Some(avg)
}) {
warn!("Could not update averages for addresses stats, {:?}", err);
}
let (totals, averages, old_totals) = self.fields_iterators();
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
let total_value = total.load(Ordering::Relaxed);
let old_total_value = old_total.load(Ordering::Relaxed);
average.store(
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
Ordering::Relaxed,
); // Avg / second
old_total.store(total_value, Ordering::Relaxed);
}
}
@@ -123,27 +134,42 @@ impl AddressStats {
}
}
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
fn fields_iterators(
&self,
) -> (
Vec<Arc<AtomicU64>>,
Vec<Arc<AtomicU64>>,
Vec<Arc<AtomicU64>>,
) {
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();
totals.push(self.total_xact_count.clone());
old_totals.push(self.old_total_xact_count.clone());
averages.push(self.avg_xact_count.clone());
totals.push(self.total_query_count.clone());
old_totals.push(self.old_total_query_count.clone());
averages.push(self.avg_query_count.clone());
totals.push(self.total_received.clone());
old_totals.push(self.old_total_received.clone());
averages.push(self.avg_recv.clone());
totals.push(self.total_sent.clone());
old_totals.push(self.old_total_sent.clone());
averages.push(self.avg_sent.clone());
totals.push(self.total_xact_time.clone());
old_totals.push(self.old_total_xact_time.clone());
averages.push(self.avg_xact_time.clone());
totals.push(self.total_query_time.clone());
old_totals.push(self.old_total_query_time.clone());
averages.push(self.avg_query_time.clone());
totals.push(self.total_wait_time.clone());
old_totals.push(self.old_total_wait_time.clone());
averages.push(self.avg_wait_time.clone());
totals.push(self.total_errors.clone());
old_totals.push(self.old_total_errors.clone());
averages.push(self.avg_errors.clone());
(totals, averages)
(totals, averages, old_totals)
}
}

View File

@@ -139,6 +139,17 @@ impl ServerStats {
self.address.stats.clone()
}
pub fn check_address_stat_average_is_updated_status(&self) -> bool {
self.address.stats.averages_updated.load(Ordering::Relaxed)
}
pub fn set_address_stat_average_is_updated_status(&self, is_checked: bool) {
self.address
.stats
.averages_updated
.store(is_checked, Ordering::Relaxed);
}
// Helper methods for show_servers
pub fn pool_name(&self) -> String {
self.pool_stats.database()

View File

@@ -14,11 +14,12 @@ describe "Admin" do
describe "SHOW STATS" do
context "clients connect and make one query" do
it "updates *_query_time and *_wait_time" do
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.close
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
end
sleep(1)
connections.map(&:close)
# wait for averages to be calculated, we shouldn't do this too often
sleep(15.5)
@@ -26,7 +27,7 @@ describe "Admin" do
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to_not eq(0)
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
@@ -71,15 +72,17 @@ describe "Admin" do
context "client connects but issues no queries" do
it "only affects cl_idle stats" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("20")
expect(results["sv_idle"]).to eq("1")
expect(results["sv_idle"]).to eq(before_test)
connections.map(&:close)
sleep(1.1)
@@ -87,7 +90,7 @@ describe "Admin" do
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
expect(results["sv_idle"]).to eq(before_test)
end
end

View File

@@ -27,7 +27,6 @@ module Helpers
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
pgcat_cfg = pgcat.current_config
pgcat_cfg["general"]["query_router_plugins"] = ["intercept"]
pgcat_cfg["pools"] = {
"#{pool_name}" => {
"default_role" => "any",
@@ -42,7 +41,24 @@ module Helpers
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
},
"users" => { "0" => user }
"users" => { "0" => user },
"plugins" => {
"intercept" => {
"enabled" => true,
"queries" => {
"0" => {
"query" => "select current_database() as a, current_schemas(false) as b",
"schema" => [
["a", "text"],
["b", "text"],
],
"result" => [
["${DATABASE}", "{public}"],
]
}
}
}
}
}
}
pgcat.update_config(pgcat_cfg)

View File

@@ -241,6 +241,18 @@ describe "Miscellaneous" do
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
it "Resets server roles correctly" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.async_exec("SET statement_timeout to 5000")
conn.close
end
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
end
end
context "transaction mode" do