mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Revert max_wait changes (#658)
* Revert "Reset wait times when checked out successfully (#656)" This reverts commitec3920d60f. * Revert "Not sure how this sneaked past CI" This reverts commit4c5498b915. * Revert "only report wait times from clients currently waiting to match behavior of pgbouncer (#655)" This reverts commit0e8064b049.
This commit is contained in:
@@ -699,7 +699,7 @@ where
|
|||||||
res.put(row_description(&columns));
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
for (_, client) in new_map {
|
for (_, client) in new_map {
|
||||||
let max_wait = client.wait_start.load(Ordering::Relaxed);
|
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
||||||
let row = vec![
|
let row = vec![
|
||||||
format!("{:#010X}", client.client_id()),
|
format!("{:#010X}", client.client_id()),
|
||||||
client.pool_name(),
|
client.pool_name(),
|
||||||
|
|||||||
@@ -38,10 +38,8 @@ pub struct ClientStats {
|
|||||||
/// Total time spent waiting for a connection from pool, measures in microseconds
|
/// Total time spent waiting for a connection from pool, measures in microseconds
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
pub total_wait_time: Arc<AtomicU64>,
|
||||||
|
|
||||||
/// When this client started waiting.
|
/// Maximum time spent waiting for a connection from pool, measures in microseconds
|
||||||
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
|
pub max_wait_time: Arc<AtomicU64>,
|
||||||
/// of us using an "AtomicInstant"
|
|
||||||
pub wait_start: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
/// Current state of the client
|
/// Current state of the client
|
||||||
pub state: Arc<AtomicClientState>,
|
pub state: Arc<AtomicClientState>,
|
||||||
@@ -65,7 +63,7 @@ impl Default for ClientStats {
|
|||||||
username: String::new(),
|
username: String::new(),
|
||||||
pool_name: String::new(),
|
pool_name: String::new(),
|
||||||
total_wait_time: Arc::new(AtomicU64::new(0)),
|
total_wait_time: Arc::new(AtomicU64::new(0)),
|
||||||
wait_start: Arc::new(AtomicU64::new(0)),
|
max_wait_time: Arc::new(AtomicU64::new(0)),
|
||||||
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
|
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
|
||||||
transaction_count: Arc::new(AtomicU64::new(0)),
|
transaction_count: Arc::new(AtomicU64::new(0)),
|
||||||
query_count: Arc::new(AtomicU64::new(0)),
|
query_count: Arc::new(AtomicU64::new(0)),
|
||||||
@@ -109,23 +107,16 @@ impl ClientStats {
|
|||||||
/// Reports a client is done querying the server and is no longer assigned a server connection
|
/// Reports a client is done querying the server and is no longer assigned a server connection
|
||||||
pub fn idle(&self) {
|
pub fn idle(&self) {
|
||||||
self.state.store(ClientState::Idle, Ordering::Relaxed);
|
self.state.store(ClientState::Idle, Ordering::Relaxed);
|
||||||
self.wait_start.store(0, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client is waiting for a connection
|
/// Reports a client is waiting for a connection
|
||||||
pub fn waiting(&self) {
|
pub fn waiting(&self) {
|
||||||
// safe to truncate, we only lose info if duration is greater than ~585,000 years
|
|
||||||
self.wait_start.store(
|
|
||||||
Instant::now().duration_since(self.connect_time).as_micros() as u64,
|
|
||||||
Ordering::Relaxed,
|
|
||||||
);
|
|
||||||
self.state.store(ClientState::Waiting, Ordering::Relaxed);
|
self.state.store(ClientState::Waiting, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client is done waiting for a connection and is about to query the server.
|
/// Reports a client is done waiting for a connection and is about to query the server.
|
||||||
pub fn active(&self) {
|
pub fn active(&self) {
|
||||||
self.state.store(ClientState::Active, Ordering::Relaxed);
|
self.state.store(ClientState::Active, Ordering::Relaxed);
|
||||||
self.wait_start.store(0, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client has failed to obtain a connection from a connection pool
|
/// Reports a client has failed to obtain a connection from a connection pool
|
||||||
@@ -143,6 +134,8 @@ impl ClientStats {
|
|||||||
pub fn checkout_time(&self, microseconds: u64) {
|
pub fn checkout_time(&self, microseconds: u64) {
|
||||||
self.total_wait_time
|
self.total_wait_time
|
||||||
.fetch_add(microseconds, Ordering::Relaxed);
|
.fetch_add(microseconds, Ordering::Relaxed);
|
||||||
|
self.max_wait_time
|
||||||
|
.fetch_max(microseconds, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a query executed by a client against a server
|
/// Report a query executed by a client against a server
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ use super::{ClientState, ServerState};
|
|||||||
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
|
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
use tokio::time::Instant;
|
|
||||||
|
|
||||||
use crate::pool::get_all_pools;
|
use crate::pool::get_all_pools;
|
||||||
|
|
||||||
@@ -54,7 +53,6 @@ impl PoolStats {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
for client in client_map.values() {
|
for client in client_map.values() {
|
||||||
match map.get_mut(&PoolIdentifier {
|
match map.get_mut(&PoolIdentifier {
|
||||||
db: client.pool_name(),
|
db: client.pool_name(),
|
||||||
@@ -64,16 +62,10 @@ impl PoolStats {
|
|||||||
match client.state.load(Ordering::Relaxed) {
|
match client.state.load(Ordering::Relaxed) {
|
||||||
ClientState::Active => pool_stats.cl_active += 1,
|
ClientState::Active => pool_stats.cl_active += 1,
|
||||||
ClientState::Idle => pool_stats.cl_idle += 1,
|
ClientState::Idle => pool_stats.cl_idle += 1,
|
||||||
ClientState::Waiting => {
|
ClientState::Waiting => pool_stats.cl_waiting += 1,
|
||||||
pool_stats.cl_waiting += 1;
|
|
||||||
// wait_start is measured as microseconds since connect_time
|
|
||||||
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
|
|
||||||
let duration_since_connect = now.duration_since(client.connect_time());
|
|
||||||
let wait_time = (duration_since_connect.as_micros() as u64)
|
|
||||||
- client.wait_start.load(Ordering::Relaxed);
|
|
||||||
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
||||||
|
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
|
||||||
}
|
}
|
||||||
None => debug!("Client from an obselete pool"),
|
None => debug!("Client from an obselete pool"),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -233,7 +233,7 @@ describe "Stats" do
|
|||||||
sleep(1.1) # Allow time for stats to update
|
sleep(1.1) # Allow time for stats to update
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
|
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -260,20 +260,12 @@ describe "Stats" do
|
|||||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
||||||
end
|
end
|
||||||
|
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
|
|
||||||
# two connections waiting => they report wait time
|
|
||||||
sleep(1.1) # Allow time for stats to update
|
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
|
||||||
expect(results["maxwait"]).to eq("1")
|
|
||||||
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
|
|
||||||
|
|
||||||
sleep(2.5) # Allow time for stats to update
|
sleep(2.5) # Allow time for stats to update
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
|
|
||||||
# no connections waiting => no reported wait time
|
expect(results["maxwait"]).to eq("1")
|
||||||
expect(results["maxwait"]).to eq("0")
|
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
|
||||||
expect(results["maxwait_us"]).to eq("0")
|
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
|
|
||||||
sleep(4.5) # Allow time for stats to update
|
sleep(4.5) # Allow time for stats to update
|
||||||
|
|||||||
Reference in New Issue
Block a user