only report wait times from clients currently waiting to match behavior of pgbouncer (#655)

* Change maxwait to only report wait times from clients currently waiting to match behavior of pgbouncer

* Fix tests
This commit is contained in:
Daniel Babiak
2023-12-04 21:19:51 -05:00
committed by GitHub
parent 4dbef49ec9
commit 0e8064b049
3 changed files with 33 additions and 12 deletions

View File

@@ -38,8 +38,10 @@ pub struct ClientStats {
/// Total time spent waiting for a connection from pool, measures in microseconds /// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>, pub total_wait_time: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds /// When this client started waiting.
pub max_wait_time: Arc<AtomicU64>, /// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
/// of us using an "AtomicInstant"
pub wait_start: Arc<AtomicU64>,
/// Current state of the client /// Current state of the client
pub state: Arc<AtomicClientState>, pub state: Arc<AtomicClientState>,
@@ -63,7 +65,7 @@ impl Default for ClientStats {
username: String::new(), username: String::new(),
pool_name: String::new(), pool_name: String::new(),
total_wait_time: Arc::new(AtomicU64::new(0)), total_wait_time: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)), wait_start: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)), state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)), transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)), query_count: Arc::new(AtomicU64::new(0)),
@@ -111,6 +113,11 @@ impl ClientStats {
/// Reports a client is waiting for a connection /// Reports a client is waiting for a connection
pub fn waiting(&self) { pub fn waiting(&self) {
// safe to truncate, we only lose info if duration is greater than ~585,000 years
self.wait_start.store(
Instant::now().duration_since(self.connect_time).as_micros() as u64,
Ordering::Relaxed,
);
self.state.store(ClientState::Waiting, Ordering::Relaxed); self.state.store(ClientState::Waiting, Ordering::Relaxed);
} }
@@ -134,8 +141,6 @@ impl ClientStats {
pub fn checkout_time(&self, microseconds: u64) { pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed); .fetch_add(microseconds, Ordering::Relaxed);
self.max_wait_time
.fetch_max(microseconds, Ordering::Relaxed);
} }
/// Report a query executed by a client against a server /// Report a query executed by a client against a server

View File

@@ -4,6 +4,7 @@ use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier}; use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::*; use std::sync::atomic::*;
use tokio::time::Instant;
use crate::pool::get_all_pools; use crate::pool::get_all_pools;
@@ -53,6 +54,7 @@ impl PoolStats {
); );
} }
let now = Instant::now();
for client in client_map.values() { for client in client_map.values() {
match map.get_mut(&PoolIdentifier { match map.get_mut(&PoolIdentifier {
db: client.pool_name(), db: client.pool_name(),
@@ -62,10 +64,16 @@ impl PoolStats {
match client.state.load(Ordering::Relaxed) { match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1, ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1, ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => pool_stats.cl_waiting += 1, ClientState::Waiting => {
pool_stats.cl_waiting += 1;
// wait_start is measured as microseconds since connect_time
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
let duration_since_connect = now.duration_since(client.connect_time());
let wait_time = (duration_since_connect.as_micros() as u64)
- client.wait_start.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
}
} }
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
} }
None => debug!("Client from an obselete pool"), None => debug!("Client from an obselete pool"),
} }

View File

@@ -233,7 +233,7 @@ describe "Stats" do
sleep(1.1) # Allow time for stats to update sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string) admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0] results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end end
@@ -260,12 +260,20 @@ describe "Stats" do
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil } threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string) admin_conn = PG::connect(processes.pgcat.admin_connection_string)
# two connections waiting => they report wait time
sleep(1.1) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0] results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1") # no connections waiting => no reported wait time
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) expect(results["maxwait"]).to eq("0")
expect(results["maxwait_us"]).to eq("0")
connections.map(&:close) connections.map(&:close)
sleep(4.5) # Allow time for stats to update sleep(4.5) # Allow time for stats to update