diff --git a/src/admin.rs b/src/admin.rs index ceba20c..804476c 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,5 @@ use crate::pool::BanReason; +use crate::stats::pool::PoolStats; use bytes::{Buf, BufMut, BytesMut}; use log::{error, info, trace}; use nix::sys::signal::{self, Signal}; @@ -14,7 +15,7 @@ use crate::errors::Error; use crate::messages::*; use crate::pool::ClientServerMap; use crate::pool::{get_all_pools, get_pool}; -use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState}; +use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState}; pub fn generate_server_info_for_admin() -> BytesMut { let mut server_info = BytesMut::new(); @@ -254,39 +255,12 @@ async fn show_pools(stream: &mut T) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let all_pool_stats = get_pool_stats(); - - let columns = vec![ - ("database", DataType::Text), - ("user", DataType::Text), - ("pool_mode", DataType::Text), - ("cl_idle", DataType::Numeric), - ("cl_active", DataType::Numeric), - ("cl_waiting", DataType::Numeric), - ("cl_cancel_req", DataType::Numeric), - ("sv_active", DataType::Numeric), - ("sv_idle", DataType::Numeric), - ("sv_used", DataType::Numeric), - ("sv_tested", DataType::Numeric), - ("sv_login", DataType::Numeric), - ("maxwait", DataType::Numeric), - ("maxwait_us", DataType::Numeric), - ]; - + let pool_lookup = PoolStats::construct_pool_lookup(); let mut res = BytesMut::new(); - res.put(row_description(&columns)); - - for ((_user_pool, _pool), pool_stats) in all_pool_stats { - let mut row = vec![ - pool_stats.database(), - pool_stats.user(), - pool_stats.pool_mode().to_string(), - ]; - pool_stats.populate_row(&mut row); - pool_stats.clear_maxwait(); - res.put(data_row(&row)); - } - + res.put(row_description(&PoolStats::generate_header())); + pool_lookup.iter().for_each(|(_identifier, pool_stats)| { + res.put(data_row(&pool_stats.generate_row())); + }); res.put(command_complete("SHOW")); // ReadyForQuery diff --git a/src/client.rs b/src/client.rs index 24d76bd..1ff558b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,7 +20,7 @@ use crate::plugins::PluginOutput; use crate::pool::{get_pool, ClientServerMap, ConnectionPool}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; -use crate::stats::{ClientStats, PoolStats, ServerStats}; +use crate::stats::{ClientStats, ServerStats}; use crate::tls::Tls; use tokio_rustls::server::TlsStream; @@ -654,24 +654,12 @@ where ready_for_query(&mut write).await?; trace!("Startup OK"); - let pool_stats = match get_pool(pool_name, username) { - Some(pool) => { - if !admin { - pool.stats - } else { - Arc::new(PoolStats::default()) - } - } - None => Arc::new(PoolStats::default()), - }; - let stats = Arc::new(ClientStats::new( process_id, application_name, username, pool_name, tokio::time::Instant::now(), - pool_stats, )); Ok(Client { diff --git a/src/mirrors.rs b/src/mirrors.rs index 7e2c9a0..0f2b02c 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -7,8 +7,7 @@ use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use crate::config::{get_config, Address, Role, User}; -use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool}; -use crate::stats::PoolStats; +use crate::pool::{ClientServerMap, ServerPool}; use log::{error, info, trace, warn}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -24,7 +23,7 @@ impl MirroredClient { async fn create_pool(&self) -> Pool { let config = get_config(); let default = std::time::Duration::from_millis(10_000).as_millis() as u64; - let (connection_timeout, idle_timeout, cfg) = + let (connection_timeout, idle_timeout, _cfg) = match config.pools.get(&self.address.pool_name) { Some(cfg) => ( cfg.connect_timeout.unwrap_or(default), @@ -34,14 +33,11 @@ impl MirroredClient { None => (default, default, crate::config::Pool::default()), }; - let identifier = PoolIdentifier::new(&self.database, &self.user.username); - let manager = ServerPool::new( self.address.clone(), self.user.clone(), self.database.as_str(), ClientServerMap::default(), - Arc::new(PoolStats::new(identifier, cfg.clone())), Arc::new(RwLock::new(None)), None, true, diff --git a/src/pool.rs b/src/pool.rs index a0b0c4d..4fa2775 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -10,6 +10,7 @@ use rand::seq::SliceRandom; use rand::thread_rng; use regex::Regex; use std::collections::HashMap; +use std::fmt::{Display, Formatter}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -26,7 +27,7 @@ use crate::auth_passthrough::AuthPassthrough; use crate::plugins::prewarmer; use crate::server::Server; use crate::sharding::ShardingFunction; -use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats}; +use crate::stats::{AddressStats, ClientStats, ServerStats}; pub type ProcessId = i32; pub type SecretKey = i32; @@ -76,6 +77,12 @@ impl PoolIdentifier { } } +impl Display for PoolIdentifier { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}@{}", self.user, self.db) + } +} + impl From<&Address> for PoolIdentifier { fn from(address: &Address) -> PoolIdentifier { PoolIdentifier::new(&address.database, &address.username) @@ -202,9 +209,6 @@ pub struct ConnectionPool { paused: Arc, paused_waiter: Arc, - /// Statistics. - pub stats: Arc, - /// AuthInfo pub auth_hash: Arc>>, } @@ -254,10 +258,6 @@ impl ConnectionPool { .clone() .into_keys() .collect::>(); - let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone())); - - // Allow the pool to be seen in statistics - pool_stats.register(pool_stats.clone()); // Sort by shard number to ensure consistency. shard_ids.sort_by_key(|k| k.parse::().unwrap()); @@ -358,7 +358,6 @@ impl ConnectionPool { user.clone(), &shard.database, client_server_map.clone(), - pool_stats.clone(), pool_auth_hash.clone(), match pool_config.plugins { Some(ref plugins) => Some(plugins.clone()), @@ -429,7 +428,6 @@ impl ConnectionPool { let pool = ConnectionPool { databases: shards, - stats: pool_stats, addresses, banlist: Arc::new(RwLock::new(banlist)), config_hash: new_pool_hash_value, @@ -610,6 +608,10 @@ impl ConnectionPool { }); } + // Indicate we're waiting on a server connection from a pool. + let now = Instant::now(); + client_stats.waiting(); + while !candidates.is_empty() { // Get the next candidate let address = match candidates.pop() { @@ -628,10 +630,6 @@ impl ConnectionPool { } } - // Indicate we're waiting on a server connection from a pool. - let now = Instant::now(); - client_stats.waiting(); - // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] .get() @@ -669,7 +667,7 @@ impl ConnectionPool { .stats() .checkout_time(checkout_time, client_stats.application_name()); server.stats().active(client_stats.application_name()); - + client_stats.active(); return Ok((conn, address.clone())); } @@ -677,11 +675,19 @@ impl ConnectionPool { .run_health_check(address, server, now, client_stats) .await { + let checkout_time: u64 = now.elapsed().as_micros() as u64; + client_stats.checkout_time(checkout_time); + server + .stats() + .checkout_time(checkout_time, client_stats.application_name()); + server.stats().active(client_stats.application_name()); + client_stats.active(); return Ok((conn, address.clone())); } else { continue; } } + client_stats.idle(); Err(Error::AllServersDown) } @@ -927,9 +933,6 @@ pub struct ServerPool { /// Client/server mapping. client_server_map: ClientServerMap, - /// Server statistics. - stats: Arc, - /// Server auth hash (for auth passthrough). auth_hash: Arc>>, @@ -946,7 +949,6 @@ impl ServerPool { user: User, database: &str, client_server_map: ClientServerMap, - stats: Arc, auth_hash: Arc>>, plugins: Option, cleanup_connections: bool, @@ -956,7 +958,6 @@ impl ServerPool { user: user.clone(), database: database.to_string(), client_server_map, - stats, auth_hash, plugins, cleanup_connections, @@ -975,7 +976,6 @@ impl ManageConnection for ServerPool { let stats = Arc::new(ServerStats::new( self.address.clone(), - self.stats.clone(), tokio::time::Instant::now(), )); diff --git a/src/prometheus.rs b/src/prometheus.rs index 6e578bf..b488460 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -9,8 +9,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use crate::config::Address; -use crate::pool::get_all_pools; -use crate::stats::{get_pool_stats, get_server_stats, ServerStats}; +use crate::pool::{get_all_pools, PoolIdentifier}; +use crate::stats::pool::PoolStats; +use crate::stats::{get_server_stats, ServerStats}; struct MetricHelpType { help: &'static str, @@ -233,10 +234,10 @@ impl PrometheusMetric { Self::from_name(&format!("stats_{}", name), value, labels) } - fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option> { + fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option> { let mut labels = HashMap::new(); - labels.insert("pool", pool.0.clone()); - labels.insert("user", pool.1.clone()); + labels.insert("pool", pool_id.db); + labels.insert("user", pool_id.user); Self::from_name(&format!("pools_{}", name), value, labels) } @@ -284,18 +285,15 @@ fn push_address_stats(lines: &mut Vec) { // Adds relevant metrics shown in a SHOW POOLS admin command. fn push_pool_stats(lines: &mut Vec) { - let pool_stats = get_pool_stats(); - for (pool, stats) in pool_stats.iter() { - let stats = &**stats; + let pool_stats = PoolStats::construct_pool_lookup(); + for (pool_id, stats) in pool_stats.iter() { for (name, value) in stats.clone() { - if let Some(prometheus_metric) = PrometheusMetric::::from_pool(pool, &name, value) + if let Some(prometheus_metric) = + PrometheusMetric::::from_pool(pool_id.clone(), &name, value) { lines.push(prometheus_metric.to_string()); } else { - warn!( - "Metric {} not implemented for ({},{})", - name, pool.0, pool.1 - ); + warn!("Metric {} not implemented for ({})", name, *pool_id); } } } diff --git a/src/stats.rs b/src/stats.rs index ce076d2..29c00d6 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,4 +1,3 @@ -use crate::pool::PoolIdentifier; /// Statistics and reporting. use arc_swap::ArcSwap; @@ -16,13 +15,11 @@ pub mod pool; pub mod server; pub use address::AddressStats; pub use client::{ClientState, ClientStats}; -pub use pool::PoolStats; pub use server::{ServerState, ServerStats}; /// Convenience types for various stats type ClientStatesLookup = HashMap>; type ServerStatesLookup = HashMap>; -type PoolStatsLookup = HashMap<(String, String), Arc>; /// Stats for individual client connections /// Used in SHOW CLIENTS. @@ -34,11 +31,6 @@ static CLIENT_STATS: Lazy>> = static SERVER_STATS: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default()))); -/// Aggregate stats for each pool (a pool is identified by database name and username) -/// Used in SHOW POOLS. -static POOL_STATS: Lazy>> = - Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default()))); - /// The statistics reporter. An instance is given to each possible source of statistics, /// e.g. client stats, server stats, connection pool stats. pub static REPORTER: Lazy> = @@ -80,13 +72,6 @@ impl Reporter { fn server_disconnecting(&self, server_id: i32) { SERVER_STATS.write().remove(&server_id); } - - /// Register a pool with the stats system. - fn pool_register(&self, identifier: PoolIdentifier, stats: Arc) { - POOL_STATS - .write() - .insert((identifier.db, identifier.user), stats); - } } /// The statistics collector which used for calculating averages @@ -139,12 +124,6 @@ pub fn get_server_stats() -> ServerStatesLookup { SERVER_STATS.read().clone() } -/// Get a snapshot of pool statistics. -/// by the `Collector`. -pub fn get_pool_stats() -> PoolStatsLookup { - POOL_STATS.read().clone() -} - /// Get the statistics reporter used to update stats across the pools/clients. pub fn get_reporter() -> Reporter { (*(*REPORTER.load())).clone() diff --git a/src/stats/client.rs b/src/stats/client.rs index 4cbcab2..6a30ec1 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -1,4 +1,3 @@ -use super::PoolStats; use super::{get_reporter, Reporter}; use atomic_enum::atomic_enum; use std::sync::atomic::*; @@ -34,12 +33,14 @@ pub struct ClientStats { pool_name: String, connect_time: Instant, - pool_stats: Arc, reporter: Reporter, /// Total time spent waiting for a connection from pool, measures in microseconds pub total_wait_time: Arc, + /// Maximum time spent waiting for a connection from pool, measures in microseconds + pub max_wait_time: Arc, + /// Current state of the client pub state: Arc, @@ -61,8 +62,8 @@ impl Default for ClientStats { application_name: String::new(), username: String::new(), pool_name: String::new(), - pool_stats: Arc::new(PoolStats::default()), total_wait_time: Arc::new(AtomicU64::new(0)), + max_wait_time: Arc::new(AtomicU64::new(0)), state: Arc::new(AtomicClientState::new(ClientState::Idle)), transaction_count: Arc::new(AtomicU64::new(0)), query_count: Arc::new(AtomicU64::new(0)), @@ -79,11 +80,9 @@ impl ClientStats { username: &str, pool_name: &str, connect_time: Instant, - pool_stats: Arc, ) -> Self { Self { client_id, - pool_stats, connect_time, application_name: application_name.to_string(), username: username.to_string(), @@ -96,8 +95,6 @@ impl ClientStats { /// update metrics on the corresponding pool. pub fn disconnect(&self) { self.reporter.client_disconnecting(self.client_id); - self.pool_stats - .client_disconnect(self.state.load(Ordering::Relaxed)) } /// Register a client with the stats system. The stats system uses client_id @@ -105,27 +102,20 @@ impl ClientStats { pub fn register(&self, stats: Arc) { self.reporter.client_register(self.client_id, stats); self.state.store(ClientState::Idle, Ordering::Relaxed); - self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed); } /// Reports a client is done querying the server and is no longer assigned a server connection pub fn idle(&self) { - self.pool_stats - .client_idle(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Idle, Ordering::Relaxed); } /// Reports a client is waiting for a connection pub fn waiting(&self) { - self.pool_stats - .client_waiting(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Waiting, Ordering::Relaxed); } /// Reports a client is done waiting for a connection and is about to query the server. pub fn active(&self) { - self.pool_stats - .client_active(self.state.load(Ordering::Relaxed)); self.state.store(ClientState::Active, Ordering::Relaxed); } @@ -144,6 +134,8 @@ impl ClientStats { pub fn checkout_time(&self, microseconds: u64) { self.total_wait_time .fetch_add(microseconds, Ordering::Relaxed); + self.max_wait_time + .fetch_max(microseconds, Ordering::Relaxed); } /// Report a query executed by a client against a server diff --git a/src/stats/pool.rs b/src/stats/pool.rs index 1b01ef2..d3ac78e 100644 --- a/src/stats/pool.rs +++ b/src/stats/pool.rs @@ -1,36 +1,131 @@ -use crate::config::Pool; -use crate::config::PoolMode; -use crate::pool::PoolIdentifier; -use std::sync::atomic::*; -use std::sync::Arc; +use log::debug; -use super::get_reporter; -use super::Reporter; use super::{ClientState, ServerState}; +use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier}; +use std::collections::HashMap; +use std::sync::atomic::*; -#[derive(Debug, Clone, Default)] +use crate::pool::get_all_pools; + +#[derive(Debug, Clone)] /// A struct that holds information about a Pool . pub struct PoolStats { - // Pool identifier, cannot be changed after creating the instance - identifier: PoolIdentifier, + pub identifier: PoolIdentifier, + pub mode: PoolMode, + pub cl_idle: u64, + pub cl_active: u64, + pub cl_waiting: u64, + pub cl_cancel_req: u64, + pub sv_active: u64, + pub sv_idle: u64, + pub sv_used: u64, + pub sv_tested: u64, + pub sv_login: u64, + pub maxwait: u64, +} +impl PoolStats { + pub fn new(identifier: PoolIdentifier, mode: PoolMode) -> Self { + PoolStats { + identifier, + mode, + cl_idle: 0, + cl_active: 0, + cl_waiting: 0, + cl_cancel_req: 0, + sv_active: 0, + sv_idle: 0, + sv_used: 0, + sv_tested: 0, + sv_login: 0, + maxwait: 0, + } + } - // Pool Config, cannot be changed after creating the instance - config: Pool, + pub fn construct_pool_lookup() -> HashMap { + let mut map: HashMap = HashMap::new(); + let client_map = super::get_client_stats(); + let server_map = super::get_server_stats(); - // A reference to the global reporter. - reporter: Reporter, + for (identifier, pool) in get_all_pools() { + map.insert( + identifier.clone(), + PoolStats::new(identifier, pool.settings.pool_mode), + ); + } - /// Counters (atomics) - pub cl_idle: Arc, - pub cl_active: Arc, - pub cl_waiting: Arc, - pub cl_cancel_req: Arc, - pub sv_active: Arc, - pub sv_idle: Arc, - pub sv_used: Arc, - pub sv_tested: Arc, - pub sv_login: Arc, - pub maxwait: Arc, + for client in client_map.values() { + match map.get_mut(&PoolIdentifier { + db: client.pool_name(), + user: client.username(), + }) { + Some(pool_stats) => { + match client.state.load(Ordering::Relaxed) { + ClientState::Active => pool_stats.cl_active += 1, + ClientState::Idle => pool_stats.cl_idle += 1, + ClientState::Waiting => pool_stats.cl_waiting += 1, + } + let max_wait = client.max_wait_time.load(Ordering::Relaxed); + pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait); + } + None => debug!("Client from an obselete pool"), + } + } + + for server in server_map.values() { + match map.get_mut(&PoolIdentifier { + db: server.pool_name(), + user: server.username(), + }) { + Some(pool_stats) => match server.state.load(Ordering::Relaxed) { + ServerState::Active => pool_stats.sv_active += 1, + ServerState::Idle => pool_stats.sv_idle += 1, + ServerState::Login => pool_stats.sv_login += 1, + ServerState::Tested => pool_stats.sv_tested += 1, + }, + None => debug!("Server from an obselete pool"), + } + } + + return map; + } + + pub fn generate_header() -> Vec<(&'static str, DataType)> { + return vec![ + ("database", DataType::Text), + ("user", DataType::Text), + ("pool_mode", DataType::Text), + ("cl_idle", DataType::Numeric), + ("cl_active", DataType::Numeric), + ("cl_waiting", DataType::Numeric), + ("cl_cancel_req", DataType::Numeric), + ("sv_active", DataType::Numeric), + ("sv_idle", DataType::Numeric), + ("sv_used", DataType::Numeric), + ("sv_tested", DataType::Numeric), + ("sv_login", DataType::Numeric), + ("maxwait", DataType::Numeric), + ("maxwait_us", DataType::Numeric), + ]; + } + + pub fn generate_row(&self) -> Vec { + return vec![ + self.identifier.db.clone(), + self.identifier.user.clone(), + self.mode.to_string(), + self.cl_idle.to_string(), + self.cl_active.to_string(), + self.cl_waiting.to_string(), + self.cl_cancel_req.to_string(), + self.sv_active.to_string(), + self.sv_idle.to_string(), + self.sv_used.to_string(), + self.sv_tested.to_string(), + self.sv_login.to_string(), + (self.maxwait / 1_000_000).to_string(), + (self.maxwait % 1_000_000).to_string(), + ]; + } } impl IntoIterator for PoolStats { @@ -39,236 +134,18 @@ impl IntoIterator for PoolStats { fn into_iter(self) -> Self::IntoIter { vec![ - ("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)), - ( - "cl_active".to_string(), - self.cl_active.load(Ordering::Relaxed), - ), - ( - "cl_waiting".to_string(), - self.cl_waiting.load(Ordering::Relaxed), - ), - ( - "cl_cancel_req".to_string(), - self.cl_cancel_req.load(Ordering::Relaxed), - ), - ( - "sv_active".to_string(), - self.sv_active.load(Ordering::Relaxed), - ), - ("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)), - ("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)), - ( - "sv_tested".to_string(), - self.sv_tested.load(Ordering::Relaxed), - ), - ( - "sv_login".to_string(), - self.sv_login.load(Ordering::Relaxed), - ), - ( - "maxwait".to_string(), - self.maxwait.load(Ordering::Relaxed) / 1_000_000, - ), - ( - "maxwait_us".to_string(), - self.maxwait.load(Ordering::Relaxed) % 1_000_000, - ), + ("cl_idle".to_string(), self.cl_idle), + ("cl_active".to_string(), self.cl_active), + ("cl_waiting".to_string(), self.cl_waiting), + ("cl_cancel_req".to_string(), self.cl_cancel_req), + ("sv_active".to_string(), self.sv_active), + ("sv_idle".to_string(), self.sv_idle), + ("sv_used".to_string(), self.sv_used), + ("sv_tested".to_string(), self.sv_tested), + ("sv_login".to_string(), self.sv_login), + ("maxwait".to_string(), self.maxwait / 1_000_000), + ("maxwait_us".to_string(), self.maxwait % 1_000_000), ] .into_iter() } } - -impl PoolStats { - pub fn new(identifier: PoolIdentifier, config: Pool) -> Self { - Self { - identifier, - config, - reporter: get_reporter(), - ..Default::default() - } - } - - // Getters - pub fn register(&self, stats: Arc) { - self.reporter.pool_register(self.identifier.clone(), stats); - } - - pub fn database(&self) -> String { - self.identifier.db.clone() - } - - pub fn user(&self) -> String { - self.identifier.user.clone() - } - - pub fn pool_mode(&self) -> PoolMode { - self.config.pool_mode - } - - /// Populates an array of strings with counters (used by admin in show pools) - pub fn populate_row(&self, row: &mut Vec) { - for (_key, value) in self.clone() { - row.push(value.to_string()); - } - } - - /// Deletes the maxwait counter, this is done everytime we obtain metrics - pub fn clear_maxwait(&self) { - self.maxwait.store(0, Ordering::Relaxed); - } - - /// Notified when a server of the pool enters login state. - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_login(&self, from: ServerState) { - self.sv_login.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Login { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'active' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_active(&self, from: ServerState) { - self.sv_active.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Active { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'tested' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_tested(&self, from: ServerState) { - self.sv_tested.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Tested { - self.decrease_from_server_state(from); - } - } - - /// Notified when a server of the pool become 'idle' - /// - /// Arguments: - /// - /// `from`: The state of the server that notifies. - pub fn server_idle(&self, from: ServerState) { - self.sv_idle.fetch_add(1, Ordering::Relaxed); - if from != ServerState::Idle { - self.decrease_from_server_state(from); - } - } - - /// Notified when a client of the pool become 'waiting' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_waiting(&self, from: ClientState) { - if from != ClientState::Waiting { - self.cl_waiting.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client of the pool become 'active' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_active(&self, from: ClientState) { - if from != ClientState::Active { - self.cl_active.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client of the pool become 'idle' - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_idle(&self, from: ClientState) { - if from != ClientState::Idle { - self.cl_idle.fetch_add(1, Ordering::Relaxed); - self.decrease_from_client_state(from); - } - } - - /// Notified when a client disconnects. - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn client_disconnect(&self, from: ClientState) { - let counter = match from { - ClientState::Idle => &self.cl_idle, - ClientState::Waiting => &self.cl_waiting, - ClientState::Active => &self.cl_active, - }; - - Self::decrease_counter(counter.clone()); - } - - /// Notified when a server disconnects. - /// - /// Arguments: - /// - /// `from`: The state of the client that notifies. - pub fn server_disconnect(&self, from: ServerState) { - let counter = match from { - ServerState::Active => &self.sv_active, - ServerState::Idle => &self.sv_idle, - ServerState::Login => &self.sv_login, - ServerState::Tested => &self.sv_tested, - }; - Self::decrease_counter(counter.clone()); - } - - // helpers for counter decrease - fn decrease_from_server_state(&self, from: ServerState) { - let counter = match from { - ServerState::Tested => &self.sv_tested, - ServerState::Active => &self.sv_active, - ServerState::Idle => &self.sv_idle, - ServerState::Login => &self.sv_login, - }; - Self::decrease_counter(counter.clone()); - } - - fn decrease_from_client_state(&self, from: ClientState) { - let counter = match from { - ClientState::Active => &self.cl_active, - ClientState::Idle => &self.cl_idle, - ClientState::Waiting => &self.cl_waiting, - }; - Self::decrease_counter(counter.clone()); - } - - fn decrease_counter(value: Arc) { - if value.load(Ordering::Relaxed) > 0 { - value.fetch_sub(1, Ordering::Relaxed); - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_decrease() { - let stat: PoolStats = PoolStats::default(); - stat.server_login(ServerState::Login); - stat.server_idle(ServerState::Login); - assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0); - assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1); - } -} diff --git a/src/stats/server.rs b/src/stats/server.rs index a327fa3..e156ee0 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -1,5 +1,4 @@ use super::AddressStats; -use super::PoolStats; use super::{get_reporter, Reporter}; use crate::config::Address; use atomic_enum::atomic_enum; @@ -38,7 +37,6 @@ pub struct ServerStats { address: Address, connect_time: Instant, - pool_stats: Arc, reporter: Reporter, /// Data @@ -57,7 +55,6 @@ impl Default for ServerStats { server_id: 0, application_name: Arc::new(RwLock::new(String::new())), address: Address::default(), - pool_stats: Arc::new(PoolStats::default()), connect_time: Instant::now(), state: Arc::new(AtomicServerState::new(ServerState::Login)), bytes_sent: Arc::new(AtomicU64::new(0)), @@ -71,10 +68,9 @@ impl Default for ServerStats { } impl ServerStats { - pub fn new(address: Address, pool_stats: Arc, connect_time: Instant) -> Self { + pub fn new(address: Address, connect_time: Instant) -> Self { Self { address, - pool_stats, connect_time, server_id: rand::random::(), ..Default::default() @@ -96,9 +92,6 @@ impl ServerStats { /// Reports a server connection is no longer assigned to a client /// and is available for the next client to pick it up pub fn idle(&self) { - self.pool_stats - .server_idle(self.state.load(Ordering::Relaxed)); - self.state.store(ServerState::Idle, Ordering::Relaxed); } @@ -106,22 +99,16 @@ impl ServerStats { /// Also updates metrics on the pool regarding server usage. pub fn disconnect(&self) { self.reporter.server_disconnecting(self.server_id); - self.pool_stats - .server_disconnect(self.state.load(Ordering::Relaxed)) } /// Reports a server connection is being tested before being given to a client. pub fn tested(&self) { self.set_undefined_application(); - self.pool_stats - .server_tested(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Tested, Ordering::Relaxed); } /// Reports a server connection is attempting to login. pub fn login(&self) { - self.pool_stats - .server_login(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Login, Ordering::Relaxed); self.set_undefined_application(); } @@ -129,8 +116,6 @@ impl ServerStats { /// Reports a server connection has been assigned to a client that /// is about to query the server pub fn active(&self, application_name: String) { - self.pool_stats - .server_active(self.state.load(Ordering::Relaxed)); self.state.store(ServerState::Active, Ordering::Relaxed); self.set_application(application_name); } @@ -152,11 +137,11 @@ impl ServerStats { // Helper methods for show_servers pub fn pool_name(&self) -> String { - self.pool_stats.database() + self.address.pool_name.clone() } pub fn username(&self) -> String { - self.pool_stats.user() + self.address.username.clone() } pub fn address_name(&self) -> String { @@ -180,9 +165,6 @@ impl ServerStats { // Update server stats and address aggregation stats self.set_application(application_name); self.address.stats.wait_time_add(microseconds); - self.pool_stats - .maxwait - .fetch_max(microseconds, Ordering::Relaxed); } /// Report a query executed by a client against a server diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index f93b1a6..9a85235 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -11,326 +11,6 @@ describe "Admin" do processes.pgcat.shutdown end - describe "SHOW STATS" do - context "clients connect and make one query" do - it "updates *_query_time and *_wait_time" do - connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(0.25)") } - end - sleep(1) - connections.map(&:close) - - # wait for averages to be calculated, we shouldn't do this too often - sleep(15.5) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW STATS")[0] - admin_conn.close - expect(results["total_query_time"].to_i).to be_within(200).of(750) - expect(results["avg_query_time"].to_i).to be_within(50).of(250) - - expect(results["total_wait_time"].to_i).to_not eq(0) - expect(results["avg_wait_time"].to_i).to_not eq(0) - end - end - end - - describe "SHOW POOLS" do - context "bad credentials" do - it "does not change any stats" do - bad_password_url = URI(pgcat_conn_str) - bad_password_url.password = "wrong" - expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad) - - sleep(1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["sv_idle"]).to eq("1") - end - end - - context "bad database name" do - it "does not change any stats" do - bad_db_url = URI(pgcat_conn_str) - bad_db_url.path = "/wrong_db" - expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad) - - sleep(1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["sv_idle"]).to eq("1") - end - end - - context "client connects but issues no queries" do - it "only affects cl_idle stats" do - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - - before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"] - connections = Array.new(20) { PG::connect(pgcat_conn_str) } - sleep(1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("20") - expect(results["sv_idle"]).to eq(before_test) - - connections.map(&:close) - sleep(1.1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq(before_test) - end - end - - context "clients connect and make one query" do - it "only affects cl_idle, sv_idle stats" do - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(2.5)") } - end - - sleep(1.1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_active"]).to eq("5") - expect(results["sv_active"]).to eq("5") - - sleep(3) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("5") - expect(results["sv_idle"]).to eq("5") - - connections.map(&:close) - sleep(1) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq("5") - end - end - - context "client connects and opens a transaction and closes connection uncleanly" do - it "produces correct statistics" do - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - Thread.new do - c.async_exec("BEGIN") - c.async_exec("SELECT pg_sleep(0.01)") - c.close - end - end - - sleep(1.1) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["sv_idle"]).to eq("5") - end - end - - context "client fail to checkout connection from the pool" do - it "counts clients as idle" do - new_configs = processes.pgcat.current_config - new_configs["general"]["connect_timeout"] = 500 - new_configs["general"]["ban_time"] = 1 - new_configs["general"]["shutdown_timeout"] = 1 - new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 - processes.pgcat.update_config(new_configs) - processes.pgcat.reload_config - - threads = [] - connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } - end - - sleep(2) - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("5") - expect(results["sv_idle"]).to eq("1") - - threads.map(&:join) - connections.map(&:close) - end - end - - context "clients connects and disconnect normally" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } - - it 'shows the same number of clients before and after' do - clients_before = clients_connected_to_pool(processes: processes) - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT 1") } - end - clients_between = clients_connected_to_pool(processes: processes) - expect(clients_before).not_to eq(clients_between) - connections.each(&:close) - clients_after = clients_connected_to_pool(processes: processes) - expect(clients_before).to eq(clients_after) - end - end - - context "clients connects and disconnect abruptly" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } - - it 'shows the same number of clients before and after' do - threads = [] - connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT 1") } - end - clients_before = clients_connected_to_pool(processes: processes) - random_string = (0...8).map { (65 + rand(26)).chr }.join - connection_string = "#{pgcat_conn_str}?application_name=#{random_string}" - faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null") - sleep(1) - # psql starts two processes, we only know the pid of the parent, this - # ensure both are killed - `pkill -9 -f '#{random_string}'` - Process.wait(faulty_client) - clients_after = clients_connected_to_pool(processes: processes) - expect(clients_before).to eq(clients_after) - end - end - - context "clients overwhelm server pools" do - let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } - - it "cl_waiting is updated to show it" do - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } - end - - sleep(1.1) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - - expect(results["cl_waiting"]).to eq("2") - expect(results["cl_active"]).to eq("2") - expect(results["sv_active"]).to eq("2") - - sleep(2.5) # Allow time for stats to update - results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| - raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" - end - expect(results["cl_idle"]).to eq("4") - expect(results["sv_idle"]).to eq("2") - - threads.map(&:join) - connections.map(&:close) - end - - it "show correct max_wait" do - threads = [] - connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } - connections.each do |c| - threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } - end - - sleep(2.5) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - results = admin_conn.async_exec("SHOW POOLS")[0] - - expect(results["maxwait"]).to eq("1") - expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) - - sleep(4.5) # Allow time for stats to update - results = admin_conn.async_exec("SHOW POOLS")[0] - expect(results["maxwait"]).to eq("0") - - threads.map(&:join) - connections.map(&:close) - end - end - end - - describe "SHOW CLIENTS" do - it "reports correct number and application names" do - conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") - connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") } - - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - sleep(1) # Wait for stats to be updated - - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(21) # count admin clients - expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8) - expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1) - - connections[0..5].map(&:close) - sleep(1) # Wait for stats to be updated - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(15) - - connections[6..].map(&:close) - sleep(1) # Wait for stats to be updated - expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1) - admin_conn.close - end - - it "reports correct number of queries and transactions" do - conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") - - connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") } - connections.each do |c| - c.async_exec("SELECT 1") - c.async_exec("SELECT 2") - c.async_exec("SELECT 3") - c.async_exec("BEGIN") - c.async_exec("SELECT 4") - c.async_exec("SELECT 5") - c.async_exec("COMMIT") - end - - admin_conn = PG::connect(processes.pgcat.admin_connection_string) - sleep(1) # Wait for stats to be updated - - results = admin_conn.async_exec("SHOW CLIENTS") - expect(results.count).to eq(3) - normal_client_results = results.reject { |r| r["database"] == "pgcat" } - expect(normal_client_results[0]["transaction_count"]).to eq("4") - expect(normal_client_results[1]["transaction_count"]).to eq("4") - expect(normal_client_results[0]["query_count"]).to eq("7") - expect(normal_client_results[1]["query_count"]).to eq("7") - - admin_conn.close - connections.map(&:close) - end - end - describe "Manual Banning" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 10) } before do @@ -401,7 +81,7 @@ describe "Admin" do end end - describe "SHOW users" do + describe "SHOW USERS" do it "returns the right users" do admin_conn = PG::connect(processes.pgcat.admin_connection_string) results = admin_conn.async_exec("SHOW USERS")[0] diff --git a/tests/ruby/stats_spec.rb b/tests/ruby/stats_spec.rb new file mode 100644 index 0000000..132b80a --- /dev/null +++ b/tests/ruby/stats_spec.rb @@ -0,0 +1,369 @@ +# frozen_string_literal: true +require 'open3' +require_relative 'spec_helper' + +describe "Stats" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") } + + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + describe "SHOW STATS" do + context "clients connect and make one query" do + it "updates *_query_time and *_wait_time" do + connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(0.25)") } + end + sleep(1) + connections.map(&:close) + + # wait for averages to be calculated, we shouldn't do this too often + sleep(15.5) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW STATS")[0] + admin_conn.close + expect(results["total_query_time"].to_i).to be_within(200).of(750) + expect(results["avg_query_time"].to_i).to be_within(50).of(250) + + expect(results["total_wait_time"].to_i).to_not eq(0) + expect(results["avg_wait_time"].to_i).to_not eq(0) + end + end + end + + describe "SHOW POOLS" do + context "bad credentials" do + it "does not change any stats" do + bad_password_url = URI(pgcat_conn_str) + bad_password_url.password = "wrong" + expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "bad database name" do + it "does not change any stats" do + bad_db_url = URI(pgcat_conn_str) + bad_db_url.path = "/wrong_db" + expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "client connects but issues no queries" do + it "only affects cl_idle stats" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + + before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"] + connections = Array.new(20) { PG::connect(pgcat_conn_str) } + sleep(1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("20") + expect(results["sv_idle"]).to eq(before_test) + + connections.map(&:close) + sleep(1.1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq(before_test) + end + end + + context "clients connect and make one query" do + it "only affects cl_idle, sv_idle stats" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(2.5)") } + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_active"]).to eq("5") + expect(results["sv_active"]).to eq("5") + + sleep(3) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("5") + + connections.map(&:close) + sleep(1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + + context "client connects and opens a transaction and closes connection uncleanly" do + it "produces correct statistics" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new do + c.async_exec("BEGIN") + c.async_exec("SELECT pg_sleep(0.01)") + c.close + end + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + + context "client fail to checkout connection from the pool" do + it "counts clients as idle" do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + threads = [] + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } + end + + sleep(2) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("1") + + threads.map(&:join) + connections.map(&:close) + end + end + + context "clients connects and disconnect normally" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it 'shows the same number of clients before and after' do + clients_before = clients_connected_to_pool(processes: processes) + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") rescue nil } + end + clients_between = clients_connected_to_pool(processes: processes) + expect(clients_before).not_to eq(clients_between) + connections.each(&:close) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + + context "clients connects and disconnect abruptly" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + + it 'shows the same number of clients before and after' do + threads = [] + connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT 1") } + end + clients_before = clients_connected_to_pool(processes: processes) + random_string = (0...8).map { (65 + rand(26)).chr }.join + connection_string = "#{pgcat_conn_str}?application_name=#{random_string}" + faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null") + sleep(1) + # psql starts two processes, we only know the pid of the parent, this + # ensure both are killed + `pkill -9 -f '#{random_string}'` + Process.wait(faulty_client) + clients_after = clients_connected_to_pool(processes: processes) + expect(clients_before).to eq(clients_after) + end + end + + context "clients overwhelm server pools" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it "cl_waiting is updated to show it" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } + end + + sleep(1.1) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["cl_waiting"]).to eq("2") + expect(results["cl_active"]).to eq("2") + expect(results["sv_active"]).to eq("2") + + sleep(2.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("4") + expect(results["sv_idle"]).to eq("2") + + threads.map(&:join) + connections.map(&:close) + end + + it "show correct max_wait" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil } + end + + sleep(2.5) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + + expect(results["maxwait"]).to eq("1") + expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) + connections.map(&:close) + + sleep(4.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + expect(results["maxwait"]).to eq("0") + + threads.map(&:join) + end + end + end + + describe "SHOW CLIENTS" do + it "reports correct number and application names" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") } + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(21) # count admin clients + expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8) + expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1) + + connections[0..5].map(&:close) + sleep(1) # Wait for stats to be updated + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(15) + + connections[6..].map(&:close) + sleep(1) # Wait for stats to be updated + expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1) + admin_conn.close + end + + it "reports correct number of queries and transactions" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + + connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") } + connections.each do |c| + c.async_exec("SELECT 1") + c.async_exec("SELECT 2") + c.async_exec("SELECT 3") + c.async_exec("BEGIN") + c.async_exec("SELECT 4") + c.async_exec("SELECT 5") + c.async_exec("COMMIT") + end + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(3) + normal_client_results = results.reject { |r| r["database"] == "pgcat" } + expect(normal_client_results[0]["transaction_count"]).to eq("4") + expect(normal_client_results[1]["transaction_count"]).to eq("4") + expect(normal_client_results[0]["query_count"]).to eq("7") + expect(normal_client_results[1]["query_count"]).to eq("7") + + admin_conn.close + connections.map(&:close) + end + end + + + describe "Query Storm" do + context "when the proxy receives overwhelmingly large number of short quick queries" do + it "should not have lingering clients or active servers" do + new_configs = processes.pgcat.current_config + + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + Array.new(40) do + Thread.new do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT pg_sleep(0.1)") + rescue PG::SystemError + ensure + conn.close + end + end.each(&:join) + + sleep 1 + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_waiting cl_cancel_req sv_used sv_tested sv_login].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + admin_conn.close + end + end + end +end