Fix maxwait metric (#183)

Max wait was being reported as 0 after #159

This PR fixes that and adds test
This commit is contained in:
Mostafa Abdelraouf
2022-10-05 21:41:09 -05:00
committed by GitHub
parent 7987c5ffad
commit 3d33ccf4b0
4 changed files with 55 additions and 23 deletions

View File

@@ -228,7 +228,13 @@ where
pool_config.pool_mode.to_string(), pool_config.pool_mode.to_string(),
]; ];
for column in &columns[3..columns.len()] { for column in &columns[3..columns.len()] {
let value = pool_stats.get(column.0).unwrap_or(&0).to_string(); let value = match column.0 {
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
"maxwait_us" => {
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
}
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
};
row.push(value); row.push(value);
} }
res.put(data_row(&row)); res.put(data_row(&row));

View File

@@ -333,7 +333,6 @@ impl ConnectionPool {
role: Option<Role>, // primary or replica role: Option<Role>, // primary or replica
process_id: i32, // client id process_id: i32, // client id
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let now = Instant::now();
let mut candidates: Vec<&Address> = self.addresses[shard] let mut candidates: Vec<&Address> = self.addresses[shard]
.iter() .iter()
.filter(|address| address.role == role) .filter(|address| address.role == role)
@@ -358,6 +357,7 @@ impl ConnectionPool {
} }
// Indicate we're waiting on a server connection from a pool. // Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
self.stats.client_waiting(process_id); self.stats.client_waiting(process_id);
// Check if we can connect // Check if we can connect
@@ -397,7 +397,7 @@ impl ConnectionPool {
match tokio::time::timeout( match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout), tokio::time::Duration::from_millis(healthcheck_timeout),
server.query(";"), // Cheap query (query parser not used in PG) server.query(";"), // Cheap query as it skips the query planner
) )
.await .await
{ {

View File

@@ -580,15 +580,15 @@ impl Collector {
server_info.query_count += stat.value as u64; server_info.query_count += stat.value as u64;
server_info.application_name = app_name; server_info.application_name = app_name;
let pool_stats = address_stat_lookup let address_stats = address_stat_lookup
.entry(server_info.address_id) .entry(server_info.address_id)
.or_insert(HashMap::default()); .or_insert(HashMap::default());
let counter = pool_stats let counter = address_stats
.entry("total_query_count".to_string()) .entry("total_query_count".to_string())
.or_insert(0); .or_insert(0);
*counter += stat.value; *counter += stat.value;
let duration = pool_stats let duration = address_stats
.entry("total_query_time".to_string()) .entry("total_query_time".to_string())
.or_insert(0); .or_insert(0);
*duration += duration_ms as i64; *duration += duration_ms as i64;
@@ -681,26 +681,26 @@ impl Collector {
Some(server_info) => { Some(server_info) => {
server_info.application_name = app_name; server_info.application_name = app_name;
let pool_stats = address_stat_lookup let address_stats = address_stat_lookup
.entry(server_info.address_id) .entry(server_info.address_id)
.or_insert(HashMap::default()); .or_insert(HashMap::default());
let counter = let counter = address_stats
pool_stats.entry("total_wait_time".to_string()).or_insert(0); .entry("total_wait_time".to_string())
.or_insert(0);
*counter += stat.value; *counter += stat.value;
let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0); let pool_stats = pool_stat_lookup
let mic_part = stat.value % 1_000_000; .entry((
server_info.pool_name.clone(),
server_info.username.clone(),
))
.or_insert(HashMap::default());
// Report max time here // We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin
if mic_part > *counter { let old_microseconds =
*counter = mic_part; pool_stats.entry("maxwait_us".to_string()).or_insert(0);
} if stat.value > *old_microseconds {
*old_microseconds = stat.value;
let counter = pool_stats.entry("maxwait".to_string()).or_insert(0);
let seconds = *counter / 1_000_000;
if seconds > *counter {
*counter = seconds;
} }
} }
None => (), None => (),
@@ -903,8 +903,6 @@ impl Collector {
"sv_active", "sv_active",
"sv_tested", "sv_tested",
"sv_login", "sv_login",
"maxwait",
"maxwait_us",
] { ] {
pool_stats.insert(stat.to_string(), 0); pool_stats.insert(stat.to_string(), 0);
} }
@@ -962,6 +960,12 @@ impl Collector {
LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); LATEST_CLIENT_STATS.store(Arc::new(client_states.clone()));
LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); LATEST_SERVER_STATS.store(Arc::new(server_states.clone()));
LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone()));
// Clear maxwait after reporting
pool_stat_lookup
.entry((pool_name.clone(), username.clone()))
.or_insert(HashMap::default())
.insert("maxwait_us".to_string(), 0);
} }
EventName::UpdateAverages { address_id } => { EventName::UpdateAverages { address_id } => {

View File

@@ -208,6 +208,28 @@ describe "Admin" do
threads.map(&:join) threads.map(&:join)
connections.map(&:close) connections.map(&:close)
end end
it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
connections.map(&:close)
end
end end
end end