From a8a30ad43b16cd30a5c66357a88b2758307feb54 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Tue, 23 May 2023 08:44:49 -0500 Subject: [PATCH] Refactor Pool Stats to be based off of Server/Client stats (#445) What is wrong Stats reported by SHOW POOLS seem to be leaking. We see lingering cl_idle , cl_waiting, and similarly for sv_idle , sv_active. We confirmed that these are reporting issues not actual lingering clients. This behavior is readily reproducible by running while true; do psql "postgres://sharding_user:sharding_user@localhost:6432/sharded_db" -c "SELECT 1" > /dev/null 2>&1 & done Why it happens I wasn't able to get to figure our the reason for the bug but my best guess is that we have race conditions when updating pool-level stats. So even though individual update operations are atomic, we perform a check then update sequence which is not protected by a guard. https://github.com/postgresml/pgcat/blob/main/src/stats/pool.rs#L174-L179 I am also suspecting that using Relaxed ordering might allow this behavior (I changed all operations to use Ordering::SeqCst but still got lingering clients) How to fix Since SHOW POOLS/SHOW SERVER/SHOW CLIENTS all show the current state of the proxy (as opposed to SHOW STATS which show aggregate values), this PR refactors SHOW POOLS to have it construct the results directly from SHOW SERVER and SHOW CLIENT datasets. This reduces the complexity of stat updates and eliminates the need for having locks when updating pool stats as we only care about updating individual client/server states. This will change the semantics of maxwait, so instead of it holding the maxwait time ever encountered by a client (connected or disconnected), it will only consider connected clients which should be okay given PgCat tends to hold on to client connections more than Pgbouncer. --- src/admin.rs | 40 +--- src/client.rs | 14 +- src/mirrors.rs | 8 +- src/pool.rs | 42 ++--- src/prometheus.rs | 24 ++- src/stats.rs | 21 --- src/stats/client.rs | 20 +- src/stats/pool.rs | 385 +++++++++++++-------------------------- src/stats/server.rs | 24 +-- tests/ruby/admin_spec.rb | 322 +------------------------------- tests/ruby/stats_spec.rb | 369 +++++++++++++++++++++++++++++++++++++ 11 files changed, 552 insertions(+), 717 deletions(-) create mode 100644 tests/ruby/stats_spec.rb diff --git a/src/admin.rs b/src/admin.rs index ceba20c..804476c 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,5 @@ use crate::pool::BanReason; +use crate::stats::pool::PoolStats; use bytes::{Buf, BufMut, BytesMut}; use log::{error, info, trace}; use nix::sys::signal::{self, Signal}; @@ -14,7 +15,7 @@ use crate::errors::Error; use crate::messages::*; use crate::pool::ClientServerMap; use crate::pool::{get_all_pools, get_pool}; -use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState}; +use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState}; pub fn generate_server_info_for_admin() -> BytesMut { let mut server_info = BytesMut::new(); @@ -254,39 +255,12 @@ async fn show_pools(stream: &mut T) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let all_pool_stats = get_pool_stats(); - - let columns = vec![ - ("database", DataType::Text), - ("user", DataType::Text), - ("pool_mode", DataType::Text), - ("cl_idle", DataType::Numeric), - ("cl_active", DataType::Numeric), - ("cl_waiting", DataType::Numeric), - ("cl_cancel_req", DataType::Numeric), - ("sv_active", DataType::Numeric), - ("sv_idle", DataType::Numeric), - ("sv_used", DataType::Numeric), - ("sv_tested", DataType::Numeric), - ("sv_login", DataType::Numeric), - ("maxwait", DataType::Numeric), - ("maxwait_us", DataType::Numeric), - ]; - + let pool_lookup = PoolStats::construct_pool_lookup(); let mut res = BytesMut::new(); - res.put(row_description(&columns)); - - for ((_user_pool, _pool), pool_stats) in all_pool_stats { - let mut row = vec![ - pool_stats.database(), - pool_stats.user(), - pool_stats.pool_mode().to_string(), - ]; - pool_stats.populate_row(&mut row); - pool_stats.clear_maxwait(); - res.put(data_row(&row)); - } - + res.put(row_description(&PoolStats::generate_header())); + pool_lookup.iter().for_each(|(_identifier, pool_stats)| { + res.put(data_row(&pool_stats.generate_row())); + }); res.put(command_complete("SHOW")); // ReadyForQuery diff --git a/src/client.rs b/src/client.rs index 24d76bd..1ff558b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,7 +20,7 @@ use crate::plugins::PluginOutput; use crate::pool::{get_pool, ClientServerMap, ConnectionPool}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; -use crate::stats::{ClientStats, PoolStats, ServerStats}; +use crate::stats::{ClientStats, ServerStats}; use crate::tls::Tls; use tokio_rustls::server::TlsStream; @@ -654,24 +654,12 @@ where ready_for_query(&mut write).await?; trace!("Startup OK"); - let pool_stats = match get_pool(pool_name, username) { - Some(pool) => { - if !admin { - pool.stats - } else { - Arc::new(PoolStats::default()) - } - } - None => Arc::new(PoolStats::default()), - }; - let stats = Arc::new(ClientStats::new( process_id, application_name, username, pool_name, tokio::time::Instant::now(), - pool_stats, )); Ok(Client { diff --git a/src/mirrors.rs b/src/mirrors.rs index 7e2c9a0..0f2b02c 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -7,8 +7,7 @@ use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use crate::config::{get_config, Address, Role, User}; -use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool}; -use crate::stats::PoolStats; +use crate::pool::{ClientServerMap, ServerPool}; use log::{error, info, trace, warn}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -24,7 +23,7 @@ impl MirroredClient { async fn create_pool(&self) -> Pool { let config = get_config(); let default = std::time::Duration::from_millis(10_000).as_millis() as u64; - let (connection_timeout, idle_timeout, cfg) = + let (connection_timeout, idle_timeout, _cfg) = match config.pools.get(&self.address.pool_name) { Some(cfg) => ( cfg.connect_timeout.unwrap_or(default), @@ -34,14 +33,11 @@ impl MirroredClient { None => (default, default, crate::config::Pool::default()), }; - let identifier = PoolIdentifier::new(&self.database, &self.user.username); - let manager = ServerPool::new( self.address.clone(), self.user.clone(), self.database.as_str(), ClientServerMap::default(), - Arc::new(PoolStats::new(identifier, cfg.clone())), Arc::new(RwLock::new(None)), None, true, diff --git a/src/pool.rs b/src/pool.rs index a0b0c4d..4fa2775 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -10,6 +10,7 @@ use rand::seq::SliceRandom; use rand::thread_rng; use regex::Regex; use std::collections::HashMap; +use std::fmt::{Display, Formatter}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -26,7 +27,7 @@ use crate::auth_passthrough::AuthPassthrough; use crate::plugins::prewarmer; use crate::server::Server; use crate::sharding::ShardingFunction; -use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats}; +use crate::stats::{AddressStats, ClientStats, ServerStats}; pub type ProcessId = i32; pub type SecretKey = i32; @@ -76,6 +77,12 @@ impl PoolIdentifier { } } +impl Display for PoolIdentifier { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}@{}", self.user, self.db) + } +} + impl From<&Address> for PoolIdentifier { fn from(address: &Address) -> PoolIdentifier { PoolIdentifier::new(&address.database, &address.username) @@ -202,9 +209,6 @@ pub struct ConnectionPool { paused: Arc, paused_waiter: Arc, - /// Statistics. - pub stats: Arc, - /// AuthInfo pub auth_hash: Arc>>, } @@ -254,10 +258,6 @@ impl ConnectionPool { .clone() .into_keys() .collect::>(); - let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone())); - - // Allow the pool to be seen in statistics - pool_stats.register(pool_stats.clone()); // Sort by shard number to ensure consistency. shard_ids.sort_by_key(|k| k.parse::().unwrap()); @@ -358,7 +358,6 @@ impl ConnectionPool { user.clone(), &shard.database, client_server_map.clone(), - pool_stats.clone(), pool_auth_hash.clone(), match pool_config.plugins { Some(ref plugins) => Some(plugins.clone()), @@ -429,7 +428,6 @@ impl ConnectionPool { let pool = ConnectionPool { databases: shards, - stats: pool_stats, addresses, banlist: Arc::new(RwLock::new(banlist)), config_hash: new_pool_hash_value, @@ -610,6 +608,10 @@ impl ConnectionPool { }); } + // Indicate we're waiting on a server connection from a pool. + let now = Instant::now(); + client_stats.waiting(); + while !candidates.is_empty() { // Get the next candidate let address = match candidates.pop() { @@ -628,10 +630,6 @@ impl ConnectionPool { } } - // Indicate we're waiting on a server connection from a pool. - let now = Instant::now(); - client_stats.waiting(); - // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] .get() @@ -669,7 +667,7 @@ impl ConnectionPool { .stats() .checkout_time(checkout_time, client_stats.application_name()); server.stats().active(client_stats.application_name()); - + client_stats.active(); return Ok((conn, address.clone())); } @@ -677,11 +675,19 @@ impl ConnectionPool { .run_health_check(address, server, now, client_stats) .await { + let checkout_time: u64 = now.elapsed().as_micros() as u64; + client_stats.checkout_time(checkout_time); + server + .stats() + .checkout_time(checkout_time, client_stats.application_name()); + server.stats().active(client_stats.application_name()); + client_stats.active(); return Ok((conn, address.clone())); } else { continue; } } + client_stats.idle(); Err(Error::AllServersDown) } @@ -927,9 +933,6 @@ pub struct ServerPool { /// Client/server mapping. client_server_map: ClientServerMap, - /// Server statistics. - stats: Arc, - /// Server auth hash (for auth passthrough). auth_hash: Arc>>, @@ -946,7 +949,6 @@ impl ServerPool { user: User, database: &str, client_server_map: ClientServerMap, - stats: Arc, auth_hash: Arc>>, plugins: Option, cleanup_connections: bool, @@ -956,7 +958,6 @@ impl ServerPool { user: user.clone(), database: database.to_string(), client_server_map, - stats, auth_hash, plugins, cleanup_connections, @@ -975,7 +976,6 @@ impl ManageConnection for ServerPool { let stats = Arc::new(ServerStats::new( self.address.clone(), - self.stats.clone(), tokio::time::Instant::now(), )); diff --git a/src/prometheus.rs b/src/prometheus.rs index 6e578bf..b488460 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -9,8 +9,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use crate::config::Address; -use crate::pool::get_all_pools; -use crate::stats::{get_pool_stats, get_server_stats, ServerStats}; +use crate::pool::{get_all_pools, PoolIdentifier}; +use crate::stats::pool::PoolStats; +use crate::stats::{get_server_stats, ServerStats}; struct MetricHelpType { help: &'static str, @@ -233,10 +234,10 @@ impl PrometheusMetric { Self::from_name(&format!("stats_{}", name), value, labels) } - fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option> { + fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option> { let mut labels = HashMap::new(); - labels.insert("pool", pool.0.clone()); - labels.insert("user", pool.1.clone()); + labels.insert("pool", pool_id.db); + labels.insert("user", pool_id.user); Self::from_name(&format!("pools_{}", name), value, labels) } @@ -284,18 +285,15 @@ fn push_address_stats(lines: &mut Vec) { // Adds relevant metrics shown in a SHOW POOLS admin command. fn push_pool_stats(lines: &mut Vec) { - let pool_stats = get_pool_stats(); - for (pool, stats) in pool_stats.iter() { - let stats = &**stats; + let pool_stats = PoolStats::construct_pool_lookup(); + for (pool_id, stats) in pool_stats.iter() { for (name, value) in stats.clone() { - if let Some(prometheus_metric) = PrometheusMetric::::from_pool(pool, &name, value) + if let Some(prometheus_metric) = + PrometheusMetric::::from_pool(pool_id.clone(), &name, value) { lines.push(prometheus_metric.to_string()); } else { - warn!( - "Metric {} not implemented for ({},{})", - name, pool.0, pool.1 - ); + warn!("Metric {} not implemented for ({})", name, *pool_id); } } } diff --git a/src/stats.rs b/src/stats.rs index ce076d2..29c00d6 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,4 +1,3 @@ -use crate::pool::PoolIdentifier; /// Statistics and reporting. use arc_swap::ArcSwap; @@ -16,13 +15,11 @@ pub mod pool; pub mod server; pub use address::AddressStats; pub use client::{ClientState, ClientStats}; -pub use pool::PoolStats; pub use server::{ServerState, ServerStats}; /// Convenience types for various stats type ClientStatesLookup = HashMap>; type ServerStatesLookup = HashMap>; -type PoolStatsLookup = HashMap<(String, String), Arc>; /// Stats for individual client connections /// Used in SHOW CLIENTS. @@ -34,11 +31,6 @@ static CLIENT_STATS: Lazy>> = static SERVER_STATS: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default()))); -/// Aggregate stats for each pool (a pool is identified by database name and username) -/// Used in SHOW POOLS. -static POOL_STATS: Lazy>> = - Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default()))); - /// The statistics reporter. An instance is given to each possible source of statistics, /// e.g. client stats, server stats, connection pool stats. pub static REPORTER: Lazy> = @@ -80,13 +72,6 @@ impl Reporter { fn server_disconnecting(&self, server_id: i32) { SERVER_STATS.write().remove(&server_id); } - - /// Register a pool with the stats system. - fn pool_register(&self, identifier: PoolIdentifier, stats: Arc) { - POOL_STATS - .write() - .insert((identifier.db, identifier.user), stats); - } } /// The statistics collector which used for calculating averages @@ -139,12 +124,6 @@ pub fn get_server_stats() -> ServerStatesLookup { SERVER_STATS.read().clone() } -/// Get a snapshot of pool statistics. -/// by the `Collector`. -pub fn get_pool_stats() -> PoolStatsLookup { - POOL_STATS.read().clone() -} - /// Get the statistics reporter used to update stats across the pools/clients. pub fn get_reporter() -> Reporter { (*(*REPORTER.load())).clone() diff --git a/src/stats/client.rs b/src/stats/client.rs index 4cbcab2..6a30ec1 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -1,4 +1,3 @@ -use super::PoolStats; use super::{get_reporter, Reporter}; use atomic_enum::atomic_enum; use std::sync::atomic::*; @@ -34,12 +33,14 @@ pub struct ClientStats { pool_name: String, connect_time: Instant, - pool_stats: Arc, reporter: Reporter, /// Total time spent waiting for a connection from pool, measures in microseconds pub total_wait_time: Arc, + /// Maximum time spent waiting for a connection from pool, measures in microseconds + pub max_wait_time: Arc, + /// Current state of the client pub state: Arc, @@ -61,8 +62,8 @@ impl Default for ClientStats { application_name: String::new(), username: String::new(), pool_name: String::new(), - pool_stats: Arc::new(PoolStats::default()), total_wait_time: Arc::new(AtomicU64::new(0)), + max_wait_time: Arc::new(AtomicU64::new(0)), state: Arc::new(AtomicClientState::new(ClientState::Idle)), transaction_count: Arc::new(AtomicU64::new(0)), query_count: Arc::new(AtomicU64::new(0)), @@ -79,11 +80,9 @@ impl ClientStats { username: &str, pool_name: &str, connect_time: Instant, - pool_stats: Arc, ) -> Self { Self { client_id, - pool_stats, connect_time, application_name: application_name.to_string(), username: username.to_string(), @@ -96,8 +95,6 @@ impl ClientStats { /// update metrics on the corresponding pool. pub fn disconnect(&self) { self.reporter.client_disconnecting(self.client_id); - self.pool_stats - .client_disconnect(self.state.load(Ordering::Relaxed)) } /// Register a client with the stats system. The stats system uses client_id @@ -105,27 +102,20 @@ impl ClientStats { pub fn register(&self, stats: Arc) { self.reporter.client_register(self.client_id, stats); self.state.store(ClientState::Idle, Ordering::Relaxed); - self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed); } /// Reports a client is done querying the server and is no longer assigned a server connection pub fn idle(&self) { - self.pool_stats - .client_idle(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Idle, Ordering::Relaxed); } /// Reports a client is waiting for a connection pub fn waiting(&self) { - self.pool_stats - .client_waiting(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Waiting, Ordering::Relaxed); } /// Reports a client is done waiting for a connection and is about to query the server. pub fn active(&self) { - self.pool_stats - .client_active(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Active, Ordering::Relaxed); } @@ -144,6 +134,8 @@ impl ClientStats { pub fn checkout_time(&self, microseconds: u64) { self.total_wait_time .fetch_add(microseconds, Ordering::Relaxed); + self.max_wait_time + .fetch_max(microseconds, Ordering::Relaxed); } /// Report a query executed by a client against a server diff --git a/src/stats/pool.rs b/src/stats/pool.rs index 1b01ef2..d3ac78e 100644 --- a/src/stats/pool.rs +++ b/src/stats/pool.rs @@ -1,36 +1,131 @@ -use crate::config::Pool; -use crate::config::PoolMode; -use crate::pool::PoolIdentifier; -use std::sync::atomic::*; -use std::sync::Arc; +use log::debug; -use super::get_reporter; -use super::Reporter; use super::{ClientState, ServerState}; +use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier}; +use std::collections::HashMap; +use std::sync::atomic::*; -#[derive(Debug, Clone, Default)] +use crate::pool::get_all_pools; + +#[derive(Debug, Clone)] /// A struct that holds information about a Pool . pub struct PoolStats { - // Pool identifier, cannot be changed after creating the instance - identifier: PoolIdentifier, + pub identifier: PoolIdentifier, + pub mode: PoolMode, + pub cl_idle: u64, + pub cl_active: u64, + pub cl_waiting: u64, + pub cl_cancel_req: u64, + pub sv_active: u64, + pub sv_idle: u64, + pub sv_used: u64, + pub sv_tested: u64, + pub sv_login: u64, + pub maxwait: u64, +} +impl PoolStats { + pub fn new(identifier: PoolIdentifier, mode: PoolMode) -> Self { + PoolStats { + identifier, + mode, + cl_idle: 0, + cl_active: 0, + cl_waiting: 0, + cl_cancel_req: 0, + sv_active: 0, + sv_idle: 0, + sv_used: 0, + sv_tested: 0, + sv_login: 0, + maxwait: 0, + } + } - // Pool Config, cannot be changed after creating the instance - config: Pool, + pub fn construct_pool_lookup() -> HashMap { + let mut map: HashMap = HashMap::new(); + let client_map = super::get_client_stats(); + let server_map = super::get_server_stats(); - // A reference to the global reporter. - reporter: Reporter, + for (identifier, pool) in get_all_pools() { + map.insert( + identifier.clone(), + PoolStats::new(identifier, pool.settings.pool_mode), + ); + } - /// Counters (atomics) - pub cl_idle: Arc, - pub cl_active: Arc, - pub cl_waiting: Arc, - pub cl_cancel_req: Arc, - pub sv_active: Arc, - pub sv_idle: Arc, - pub sv_used: Arc, - pub sv_tested: Arc, - pub sv_login: Arc, - pub maxwait: Arc, + for client in client_map.values() { + match map.get_mut(&PoolIdentifier { + db: client.pool_name(), + user: client.username(), + }) { + Some(pool_stats) => { + match client.state.load(Ordering::Relaxed) { + ClientState::Active => pool_stats.cl_active += 1, + ClientState::Idle => pool_stats.cl_idle += 1, + ClientState::Waiting => pool_stats.cl_waiting += 1, + } + let max_wait = client.max_wait_time.load(Ordering::Relaxed); + pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait); + } + None => debug!("Client from an obselete pool"), + } + } + + for server in server_map.values() { + match map.get_mut(&PoolIdentifier { + db: server.pool_name(), + user: server.username(), + }) { + Some(pool_stats) => match server.state.load(Ordering::Relaxed) { + ServerState::Active => pool_stats.sv_active += 1, + ServerState::Idle => pool_stats.sv_idle += 1, + ServerState::Login => pool_stats.sv_login += 1, + ServerState::Tested => pool_stats.sv_tested += 1, + }, + None => debug!("Server from an obselete pool"), + } + } + + return map; + } + + pub fn generate_header() -> Vec<(&'static str, DataType)> { + return vec![ + ("database", DataType::Text), + ("user", DataType::Text), + ("pool_mode", DataType::Text), + ("cl_idle", DataType::Numeric), + ("cl_active", DataType::Numeric), + ("cl_waiting", DataType::Numeric), + ("cl_cancel_req", DataType::Numeric), + ("sv_active", DataType::Numeric), + ("sv_idle", DataType::Numeric), + ("sv_used", DataType::Numeric), + ("sv_tested", DataType::Numeric), + ("sv_login", DataType::Numeric), + ("maxwait", DataType::Numeric), + ("maxwait_us", DataType::Numeric), + ]; + } + + pub fn generate_row(&self) -> Vec { + return vec![ + self.identifier.db.clone(), + self.identifier.user.clone(), + self.mode.to_string(), + self.cl_idle.to_string(), + self.cl_active.to_string(), + self.cl_waiting.to_string(), + self.cl_cancel_req.to_string(), + self.sv_active.to_string(), + self.sv_idle.to_string(), + self.sv_used.to_string(), + self.sv_tested.to_string(), + self.sv_login.to_string(), + (self.maxwait / 1_000_000).to_string(), + (self.maxwait % 1_000_000).to_string(), + ]; + } } impl IntoIterator for PoolStats { @@ -39,236 +134,18 @@ impl IntoIterator for PoolStats { fn into_iter(self) -> Self::IntoIter { vec![ - ("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)), - ( - "cl_active".to_string(), - self.cl_active.load(Ordering::Relaxed), - ), - ( - "cl_waiting".to_string(), - self.cl_waiting.load(Ordering::Relaxed), - ), - ( - "cl_cancel_req".to_string(), - self.cl_cancel_req.load(Ordering::Relaxed), - ), - ( - "sv_active".to_string(), - self.sv_active.load(Ordering::Relaxed), - ), - ("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)), - ("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)), - ( - "sv_tested".to_string(), - self.sv_tested.load(Ordering::Relaxed), - ), - ( - "sv_login".to_string(), - self.sv_login.load(Ordering::Relaxed), - ), - ( - "maxwait".to_string(), - self.maxwait.load(Ordering::Relaxed) / 1_000_000, - ), - ( - "maxwait_us".to_string(), - self.maxwait.load(Ordering::Relaxed) % 1_000_000, - ), + ("cl_idle".to_string(), self.cl_idle), + ("cl_active".to_string(), self.cl_active), + ("cl_waiting".to_string(), self.cl_waiting), + ("cl_cancel_req".to_string(), self.cl_cancel_req), + ("sv_active".to_string(), self.sv_active), + ("sv_idle".to_string(), self.sv_idle), + ("sv_used".to_string(), self.sv_used), + ("sv_tested".to_string(), self.sv_tested), + ("sv_login".to_string(), self.sv_login), + ("maxwait".to_string(), self.maxwait / 1_000_000), + ("maxwait_us".to_string(), self.maxwait % 1_000_000), ] .into_iter() } } - -impl PoolStats { - pub fn new(identifier: PoolIdentifier, config: Pool) -> Self { - Self { - identifier, - config, - reporter: get_reporter(), - ..Default::default() - } - } - - // Getters - pub fn register(&self, stats: Arc) { - self.reporter.pool_register(self.identifier.clone(), stats); - } - - pub fn database(&self) -> String { - self.identifier.db.clone() - } - - pub fn user(&self) -> String { - self.identifier.user.clone() - } - - pub fn pool_mode(&self) -> PoolMode { - self.config.pool_mode - } - - /// Populates an array of strings with counters (used by admin in show pools) - pub fn populate_row(&self, row: &mut Vec) { - for (_key, value) in self.clone() { - row.push(value.to_string()); - } - } - - /// Deletes the maxwait counter, this is done everytime we obtain metrics - pub fn clear_maxwait(&self) { - self.maxwait.store(0, Ordering::Relaxed); - } - - /// Notified when a server of the pool enters login state. - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_login(&self, from: ServerState) { - self.sv_login.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Login { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'active' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_active(&self, from: ServerState) { - self.sv_active.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Active { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'tested' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_tested(&self, from: ServerState) { - self.sv_tested.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Tested { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'idle' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_idle(&self, from: ServerState) { - self.sv_idle.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Idle { - self.decrease_from_server_state(from); - } - } - - /// Notified when a client of the pool become 'waiting' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_waiting(&self, from: ClientState) { - if from != ClientState::Waiting { - self.cl_waiting.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client of the pool become 'active' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_active(&self, from: ClientState) { - if from != ClientState::Active { - self.cl_active.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client of the pool become 'idle' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_idle(&self, from: ClientState) { - if from != ClientState::Idle { - self.cl_idle.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client disconnects. - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_disconnect(&self, from: ClientState) { - let counter = match from { - ClientState::Idle => &self.cl_idle, - ClientState::Waiting => &self.cl_waiting, - ClientState::Active => &self.cl_active, - }; - - Self::decrease_counter(counter.clone()); - } - - /// Notified when a server disconnects. - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn server_disconnect(&self, from: ServerState) { - let counter = match from { - ServerState::Active => &self.sv_active, - ServerState::Idle => &self.sv_idle, - ServerState::Login => &self.sv_login, - ServerState::Tested => &self.sv_tested, - }; - Self::decrease_counter(counter.clone()); - } - - // helpers for counter decrease - fn decrease_from_server_state(&self, from: ServerState) { - let counter = match from { - ServerState::Tested => &self.sv_tested, - ServerState::Active => &self.sv_active, - ServerState::Idle => &self.sv_idle, - ServerState::Login => &self.sv_login, - }; - Self::decrease_counter(counter.clone()); - } - - fn decrease_from_client_state(&self, from: ClientState) { - let counter = match from { - ClientState::Active => &self.cl_active, - ClientState::Idle => &self.cl_idle, - ClientState::Waiting => &self.cl_waiting, - }; - Self::decrease_counter(counter.clone()); - } - - fn decrease_counter(value: Arc) { - if value.load(Ordering::Relaxed) > 0 { - value.fetch_sub(1, Ordering::Relaxed); - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_decrease() { - let stat: PoolStats = PoolStats::default(); - stat.server_login(ServerState::Login); - stat.server_idle(ServerState::Login); - assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0); - assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1); - } -} diff --git a/src/stats/server.rs b/src/stats/server.rs index a327fa3..e156ee0 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -1,5 +1,4 @@ use super::AddressStats; -use super::PoolStats; use super::{get_reporter, Reporter}; use crate::config::Address; use atomic_enum::atomic_enum; @@ -38,7 +37,6 @@ pub struct ServerStats { address: Address, connect_time: Instant, - pool_stats: Arc, reporter: Reporter, /// Data @@ -57,7 +55,6 @@ impl Default for ServerStats { server_id: 0, application_name: Arc::new(RwLock::new(String::new())), address: Address::default(), - pool_stats: Arc::new(PoolStats::default()), connect_time: Instant::now(), state: Arc::new(AtomicServerState::new(ServerState::Login)), bytes_sent: Arc::new(AtomicU64::new(0)), @@ -71,10 +68,9 @@ impl Default for ServerStats { } impl ServerStats { - pub fn new(address: Address, pool_stats: Arc, connect_time: Instant) -> Self { + pub fn new(address: Address, connect_time: Instant) -> Self { Self { address, - pool_stats, connect_time, server_id: rand::random::(), ..Default::default() @@ -96,9 +92,6 @@ impl ServerStats { /// Reports a server connection is no longer assigned to a client /// and is available for the next client to pick it up pub fn idle(&self) { - self.pool_stats - .server_idle(self.state.load(Ordering::Relaxed)); - self.state.store(ServerState::Idle, Ordering::Relaxed); } @@ -106,22 +99,16 @@ impl ServerStats { /// Also updates metrics on the pool regarding server usage. pub fn disconnect(&self) { self.reporter.server_disconnecting(self.server_id); - self.pool_stats - .server_disconnect(self.state.load(Ordering::Relaxed)) } /// Reports a server connection is being tested before being given to a client. pub fn tested(&self) { self.set_undefined_application(); - self.pool_stats - .server_tested(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Tested, Ordering::Relaxed); } /// Reports a server connection is attempting to login. pub fn login(&self) { - self.pool_stats - .server_login(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Login, Ordering::Relaxed); self.set_undefined_application(); } @@ -129,8 +116,6 @@ impl ServerStats { /// Reports a server connection has been assigned to a client that /// is about to query the server pub fn active(&self, application_name: String) { - self.pool_stats - .server_active(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Active, Ordering::Relaxed); self.set_application(application_name); } @@ -152,11 +137,11 @@ impl ServerStats { // Helper methods for show_servers pub fn pool_name(&self) -> String { - self.pool_stats.database() + self.address.pool_name.clone() } pub fn username(&self) -> String { - self.pool_stats.user() + self.address.username.clone() } pub fn address_name(&self) -> String { @@ -180,9 +165,6 @@ impl ServerStats { // Update server stats and address aggregation stats self.set_application(application_name); self.address.stats.wait_time_add(microseconds); - self.pool_stats - .maxwait - .fetch_max(microseconds, Ordering::Relaxed); } /// Report a query executed by a client against a server diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index f93b1a6..9a85235 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -11,326 +11,6 @@ describe "Admin" do processes.pgcat.shutdown end - describe "SHOW STATS" do - context "clients connect and make one query" do - it "updates *_query_time and *_wait_time" do - connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(0.25)") } - end - sleep(1) - connections.map(&:close) - - # wait for averages to be calculated, we shouldn't do this too often - sleep(15.5) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW STATS")[0] - admin_conn.close - expect(results["total_query_time"].to_i).to be_within(200).of(750) - expect(results["avg_query_time"].to_i).to be_within(50).of(250) - - expect(results["total_wait_time"].to_i).to_not eq(0) - expect(results["avg_wait_time"].to_i).to_not eq(0) - end - end - end - - describe "SHOW POOLS" do - context "bad credentials" do - it "does not change any stats" do - bad_password_url = URI(pgcat_conn_str) - bad_password_url.password = "wrong" - expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad) - - sleep(1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["sv_idle"]).to eq("1") - end - end - - context "bad database name" do - it "does not change any stats" do - bad_db_url = URI(pgcat_conn_str) - bad_db_url.path = "/wrong_db" - expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad) - - sleep(1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["sv_idle"]).to eq("1") - end - end - - context "client connects but issues no queries" do - it "only affects cl_idle stats" do - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - - before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"] - connections = Array.new(20) { PG::connect(pgcat_conn_str) } - sleep(1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("20") - expect(results["sv_idle"]).to eq(before_test) - - connections.map(&:close) - sleep(1.1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq(before_test) - end - end - - context "clients connect and make one query" do - it "only affects cl_idle, sv_idle stats" do - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(2.5)") } - end - - sleep(1.1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_active"]).to eq("5") - expect(results["sv_active"]).to eq("5") - - sleep(3) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("5") - expect(results["sv_idle"]).to eq("5") - - connections.map(&:close) - sleep(1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq("5") - end - end - - context "client connects and opens a transaction and closes connection uncleanly" do - it "produces correct statistics" do - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new do - c.async_exec("BEGIN") - c.async_exec("SELECT pg_sleep(0.01)") - c.close - end - end - - sleep(1.1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq("5") - end - end - - context "client fail to checkout connection from the pool" do - it "counts clients as idle" do - new_configs = processes.pgcat.current_config - new_configs["general"]["connect_timeout"] = 500 - new_configs["general"]["ban_time"] = 1 - new_configs["general"]["shutdown_timeout"] = 1 - new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 - processes.pgcat.update_config(new_configs) - processes.pgcat.reload_config - - threads = [] - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } - end - - sleep(2) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("5") - expect(results["sv_idle"]).to eq("1") - - threads.map(&:join) - connections.map(&:close) - end - end - - context "clients connects and disconnect normally" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } - - it 'shows the same number of clients before and after' do - clients_before = clients_connected_to_pool(processes: processes) - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT 1") } - end - clients_between = clients_connected_to_pool(processes: processes) - expect(clients_before).not_to eq(clients_between) - connections.each(&:close) - clients_after = clients_connected_to_pool(processes: processes) - expect(clients_before).to eq(clients_after) - end - end - - context "clients connects and disconnect abruptly" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } - - it 'shows the same number of clients before and after' do - threads = [] - connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT 1") } - end - clients_before = clients_connected_to_pool(processes: processes) - random_string = (0...8).map { (65 + rand(26)).chr }.join - connection_string = "#{pgcat_conn_str}?application_name=#{random_string}" - faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null") - sleep(1) - # psql starts two processes, we only know the pid of the parent, this - # ensure both are killed - `pkill -9 -f '#{random_string}'` - Process.wait(faulty_client) - clients_after = clients_connected_to_pool(processes: processes) - expect(clients_before).to eq(clients_after) - end - end - - context "clients overwhelm server pools" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } - - it "cl_waiting is updated to show it" do - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } - end - - sleep(1.1) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["cl_waiting"]).to eq("2") - expect(results["cl_active"]).to eq("2") - expect(results["sv_active"]).to eq("2") - - sleep(2.5) # Allow time for stats to update - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("4") - expect(results["sv_idle"]).to eq("2") - - threads.map(&:join) - connections.map(&:close) - end - - it "show correct max_wait" do - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } - end - - sleep(2.5) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - - expect(results["maxwait"]).to eq("1") - expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) - - sleep(4.5) # Allow time for stats to update - results = admin_conn.async_exec("SHOW POOLS")[0] - expect(results["maxwait"]).to eq("0") - - threads.map(&:join) - connections.map(&:close) - end - end - end - - describe "SHOW CLIENTS" do - it "reports correct number and application names" do - conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") - connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") } - - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - sleep(1) # Wait for stats to be updated - - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(21) # count admin clients - expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8) - expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1) - - connections[0..5].map(&:close) - sleep(1) # Wait for stats to be updated - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(15) - - connections[6..].map(&:close) - sleep(1) # Wait for stats to be updated - expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1) - admin_conn.close - end - - it "reports correct number of queries and transactions" do - conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") - - connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") } - connections.each do |c| - c.async_exec("SELECT 1") - c.async_exec("SELECT 2") - c.async_exec("SELECT 3") - c.async_exec("BEGIN") - c.async_exec("SELECT 4") - c.async_exec("SELECT 5") - c.async_exec("COMMIT") - end - - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - sleep(1) # Wait for stats to be updated - - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(3) - normal_client_results = results.reject { |r| r["database"] == "pgcat" } - expect(normal_client_results[0]["transaction_count"]).to eq("4") - expect(normal_client_results[1]["transaction_count"]).to eq("4") - expect(normal_client_results[0]["query_count"]).to eq("7") - expect(normal_client_results[1]["query_count"]).to eq("7") - - admin_conn.close - connections.map(&:close) - end - end - describe "Manual Banning" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 10) } before do @@ -401,7 +81,7 @@ describe "Admin" do end end - describe "SHOW users" do + describe "SHOW USERS" do it "returns the right users" do admin_conn = PG::connect(processes.pgcat.admin_connection_string) results = admin_conn.async_exec("SHOW USERS")[0] diff --git a/tests/ruby/stats_spec.rb b/tests/ruby/stats_spec.rb new file mode 100644 index 0000000..132b80a --- /dev/null +++ b/tests/ruby/stats_spec.rb @@ -0,0 +1,369 @@ +# frozen_string_literal: true +require 'open3' +require_relative 'spec_helper' + +describe "Stats" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") } + + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + describe "SHOW STATS" do + context "clients connect and make one query" do + it "updates *_query_time and *_wait_time" do + connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(0.25)") } + end + sleep(1) + connections.map(&:close) + + # wait for averages to be calculated, we shouldn't do this too often + sleep(15.5) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW STATS")[0] + admin_conn.close + expect(results["total_query_time"].to_i).to be_within(200).of(750) + expect(results["avg_query_time"].to_i).to be_within(50).of(250) + + expect(results["total_wait_time"].to_i).to_not eq(0) + expect(results["avg_wait_time"].to_i).to_not eq(0) + end + end + end + + describe "SHOW POOLS" do + context "bad credentials" do + it "does not change any stats" do + bad_password_url = URI(pgcat_conn_str) + bad_password_url.password = "wrong" + expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "bad database name" do + it "does not change any stats" do + bad_db_url = URI(pgcat_conn_str) + bad_db_url.path = "/wrong_db" + expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "client connects but issues no queries" do + it "only affects cl_idle stats" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"] + connections = Array.new(20) { PG::connect(pgcat_conn_str) } + sleep(1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("20") + expect(results["sv_idle"]).to eq(before_test) + + connections.map(&:close) + sleep(1.1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq(before_test) + end + end + + context "clients connect and make one query" do + it "only affects cl_idle, sv_idle stats" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(2.5)") } + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_active"]).to eq("5") + expect(results["sv_active"]).to eq("5") + + sleep(3) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("5") + + connections.map(&:close) + sleep(1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + + context "client connects and opens a transaction and closes connection uncleanly" do + it "produces correct statistics" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new do + c.async_exec("BEGIN") + c.async_exec("SELECT pg_sleep(0.01)") + c.close + end + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + + context "client fail to checkout connection from the pool" do + it "counts clients as idle" do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + threads = [] + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } + end + + sleep(2) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("1") + + threads.map(&:join) + connections.map(&:close) + end + end + + context "clients connects and disconnect normally" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it 'shows the same number of clients before and after' do + clients_before = clients_connected_to_pool(processes: processes) + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") rescue nil } + end + clients_between = clients_connected_to_pool(processes: processes) + expect(clients_before).not_to eq(clients_between) + connections.each(&:close) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + + context "clients connects and disconnect abruptly" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + + it 'shows the same number of clients before and after' do + threads = [] + connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") } + end + clients_before = clients_connected_to_pool(processes: processes) + random_string = (0...8).map { (65 + rand(26)).chr }.join + connection_string = "#{pgcat_conn_str}?application_name=#{random_string}" + faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null") + sleep(1) + # psql starts two processes, we only know the pid of the parent, this + # ensure both are killed + `pkill -9 -f '#{random_string}'` + Process.wait(faulty_client) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + + context "clients overwhelm server pools" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it "cl_waiting is updated to show it" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } + end + + sleep(1.1) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["cl_waiting"]).to eq("2") + expect(results["cl_active"]).to eq("2") + expect(results["sv_active"]).to eq("2") + + sleep(2.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("4") + expect(results["sv_idle"]).to eq("2") + + threads.map(&:join) + connections.map(&:close) + end + + it "show correct max_wait" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil } + end + + sleep(2.5) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + + expect(results["maxwait"]).to eq("1") + expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) + connections.map(&:close) + + sleep(4.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + expect(results["maxwait"]).to eq("0") + + threads.map(&:join) + end + end + end + + describe "SHOW CLIENTS" do + it "reports correct number and application names" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") } + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(21) # count admin clients + expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8) + expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1) + + connections[0..5].map(&:close) + sleep(1) # Wait for stats to be updated + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(15) + + connections[6..].map(&:close) + sleep(1) # Wait for stats to be updated + expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1) + admin_conn.close + end + + it "reports correct number of queries and transactions" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + + connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") } + connections.each do |c| + c.async_exec("SELECT 1") + c.async_exec("SELECT 2") + c.async_exec("SELECT 3") + c.async_exec("BEGIN") + c.async_exec("SELECT 4") + c.async_exec("SELECT 5") + c.async_exec("COMMIT") + end + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(3) + normal_client_results = results.reject { |r| r["database"] == "pgcat" } + expect(normal_client_results[0]["transaction_count"]).to eq("4") + expect(normal_client_results[1]["transaction_count"]).to eq("4") + expect(normal_client_results[0]["query_count"]).to eq("7") + expect(normal_client_results[1]["query_count"]).to eq("7") + + admin_conn.close + connections.map(&:close) + end + end + + + describe "Query Storm" do + context "when the proxy receives overwhelmingly large number of short quick queries" do + it "should not have lingering clients or active servers" do + new_configs = processes.pgcat.current_config + + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + Array.new(40) do + Thread.new do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT pg_sleep(0.1)") + rescue PG::SystemError + ensure + conn.close + end + end.each(&:join) + + sleep 1 + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_waiting cl_cancel_req sv_used sv_tested sv_login].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + admin_conn.close + end + end + end +end