mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
What is wrong
Stats reported by SHOW POOLS seem to be leaking. We see lingering cl_idle , cl_waiting, and similarly for sv_idle , sv_active. We confirmed that these are reporting issues not actual lingering clients.
This behavior is readily reproducible by running
while true; do
psql "postgres://sharding_user:sharding_user@localhost:6432/sharded_db" -c "SELECT 1" > /dev/null 2>&1 &
done
Why it happens
I wasn't able to get to figure our the reason for the bug but my best guess is that we have race conditions when updating pool-level stats. So even though individual update operations are atomic, we perform a check then update sequence which is not protected by a guard.
https://github.com/postgresml/pgcat/blob/main/src/stats/pool.rs#L174-L179
I am also suspecting that using Relaxed ordering might allow this behavior (I changed all operations to use Ordering::SeqCst but still got lingering clients)
How to fix
Since SHOW POOLS/SHOW SERVER/SHOW CLIENTS all show the current state of the proxy (as opposed to SHOW STATS which show aggregate values), this PR refactors SHOW POOLS to have it construct the results directly from SHOW SERVER and SHOW CLIENT datasets. This reduces the complexity of stat updates and eliminates the need for having locks when updating pool stats as we only care about updating individual client/server states.
This will change the semantics of maxwait, so instead of it holding the maxwait time ever encountered by a client (connected or disconnected), it will only consider connected clients which should be okay given PgCat tends to hold on to client connections more than Pgbouncer.
131 lines
4.4 KiB
Rust
131 lines
4.4 KiB
Rust
/// Statistics and reporting.
|
|
use arc_swap::ArcSwap;
|
|
|
|
use log::{info, warn};
|
|
use once_cell::sync::Lazy;
|
|
use parking_lot::RwLock;
|
|
use std::collections::HashMap;
|
|
|
|
use std::sync::Arc;
|
|
|
|
// Structs that hold stats for different resources
|
|
pub mod address;
|
|
pub mod client;
|
|
pub mod pool;
|
|
pub mod server;
|
|
pub use address::AddressStats;
|
|
pub use client::{ClientState, ClientStats};
|
|
pub use server::{ServerState, ServerStats};
|
|
|
|
/// Convenience types for various stats
|
|
type ClientStatesLookup = HashMap<i32, Arc<ClientStats>>;
|
|
type ServerStatesLookup = HashMap<i32, Arc<ServerStats>>;
|
|
|
|
/// Stats for individual client connections
|
|
/// Used in SHOW CLIENTS.
|
|
static CLIENT_STATS: Lazy<Arc<RwLock<ClientStatesLookup>>> =
|
|
Lazy::new(|| Arc::new(RwLock::new(ClientStatesLookup::default())));
|
|
|
|
/// Stats for individual server connections
|
|
/// Used in SHOW SERVERS.
|
|
static SERVER_STATS: Lazy<Arc<RwLock<ServerStatesLookup>>> =
|
|
Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default())));
|
|
|
|
/// The statistics reporter. An instance is given to each possible source of statistics,
|
|
/// e.g. client stats, server stats, connection pool stats.
|
|
pub static REPORTER: Lazy<ArcSwap<Reporter>> =
|
|
Lazy::new(|| ArcSwap::from_pointee(Reporter::default()));
|
|
|
|
/// Statistics period used for average calculations.
|
|
/// 15 seconds.
|
|
static STAT_PERIOD: u64 = 15000;
|
|
|
|
/// The statistics reporter. An instance is given
|
|
/// to each possible source of statistics,
|
|
/// e.g. clients, servers, connection pool.
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct Reporter {}
|
|
|
|
impl Reporter {
|
|
/// Register a client with the stats system. The stats system uses client_id
|
|
/// to track and aggregate statistics from all source that relate to that client
|
|
fn client_register(&self, client_id: i32, stats: Arc<ClientStats>) {
|
|
if CLIENT_STATS.read().get(&client_id).is_some() {
|
|
warn!("Client {:?} was double registered!", client_id);
|
|
return;
|
|
}
|
|
|
|
CLIENT_STATS.write().insert(client_id, stats);
|
|
}
|
|
|
|
/// Reports a client is disconnecting from the pooler.
|
|
fn client_disconnecting(&self, client_id: i32) {
|
|
CLIENT_STATS.write().remove(&client_id);
|
|
}
|
|
|
|
/// Register a server connection with the stats system. The stats system uses server_id
|
|
/// to track and aggregate statistics from all source that relate to that server
|
|
fn server_register(&self, server_id: i32, stats: Arc<ServerStats>) {
|
|
SERVER_STATS.write().insert(server_id, stats);
|
|
}
|
|
/// Reports a server connection is disconnecting from the pooler.
|
|
fn server_disconnecting(&self, server_id: i32) {
|
|
SERVER_STATS.write().remove(&server_id);
|
|
}
|
|
}
|
|
|
|
/// The statistics collector which used for calculating averages
|
|
/// There is only one collector (kind of like a singleton)
|
|
/// it updates averages every 15 seconds.
|
|
#[derive(Default)]
|
|
pub struct Collector {}
|
|
|
|
impl Collector {
|
|
/// The statistics collection handler. It will collect statistics
|
|
/// for `address_id`s starting at 0 up to `addresses`.
|
|
pub async fn collect(&mut self) {
|
|
info!("Events reporter started");
|
|
|
|
tokio::task::spawn(async move {
|
|
let mut interval =
|
|
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
// Hold read lock for duration of update to retain all server stats
|
|
let server_stats = SERVER_STATS.read();
|
|
|
|
for stats in server_stats.values() {
|
|
if !stats.check_address_stat_average_is_updated_status() {
|
|
stats.address_stats().update_averages();
|
|
stats.address_stats().reset_current_counts();
|
|
stats.set_address_stat_average_is_updated_status(true);
|
|
}
|
|
}
|
|
|
|
// Reset to false for next update
|
|
for stats in server_stats.values() {
|
|
stats.set_address_stat_average_is_updated_status(false);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
/// Get a snapshot of client statistics.
|
|
/// by the `Collector`.
|
|
pub fn get_client_stats() -> ClientStatesLookup {
|
|
CLIENT_STATS.read().clone()
|
|
}
|
|
|
|
/// Get a snapshot of server statistics.
|
|
/// by the `Collector`.
|
|
pub fn get_server_stats() -> ServerStatesLookup {
|
|
SERVER_STATS.read().clone()
|
|
}
|
|
|
|
/// Get the statistics reporter used to update stats across the pools/clients.
|
|
pub fn get_reporter() -> Reporter {
|
|
(*(*REPORTER.load())).clone()
|
|
}
|