From 206fdc97699d111e33b6153206b88edebe433f08 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sat, 26 Feb 2022 10:03:11 -0800 Subject: [PATCH] Fix some stats (#47) * fix some stats * use constant * lint --- src/stats.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 40a8ab7..399bee6 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,4 +1,4 @@ -use log::{error, info}; +use log::{debug, error, info}; use once_cell::sync::OnceCell; use statsd::Client; /// Events collector and publisher. @@ -10,6 +10,7 @@ use std::sync::{Arc, Mutex}; use crate::config::get_config; static LATEST_STATS: OnceCell>>> = OnceCell::new(); +static STAT_PERIOD: u64 = 15000; //15 seconds #[derive(Debug, Clone, Copy)] enum EventName { @@ -228,7 +229,15 @@ impl Collector { ("total_xact_count", 0), ("total_sent", 0), ("total_received", 0), + ("total_xact_time", 0), + ("total_query_time", 0), ("total_wait_time", 0), + ("avg_xact_time", 0), + ("avg_query_time", 0), + ("avg_xact_count", 0), + ("avg_sent", 0), + ("avg_received", 0), + ("avg_wait_time", 0), ("maxwait_us", 0), ("maxwait", 0), ("cl_waiting", 0), @@ -240,11 +249,18 @@ impl Collector { ("sv_tested", 0), ]); - let mut client_server_states: HashMap = HashMap::new(); - let tx = self.tx.clone(); + // Stats saved after each iteration of the flush event. Used in calculation + // of averages in the last flush period. + let mut old_stats: HashMap = HashMap::new(); + // Track which state the client and server are at any given time. + let mut client_server_states: HashMap = HashMap::new(); + + // Flush stats to StatsD and calculate averages every 15 seconds. + let tx = self.tx.clone(); tokio::task::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000)); + let mut interval = + tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); loop { interval.tick().await; let _ = tx.try_send(Event { @@ -255,6 +271,7 @@ impl Collector { } }); + // The collector loop loop { let stat = match self.rx.recv().await { Some(stat) => stat, @@ -291,10 +308,11 @@ impl Collector { *counter += stat.value; let counter = stats.entry("maxwait_us").or_insert(0); + let mic_part = stat.value % 1_000_000; // Report max time here - if stat.value > *counter { - *counter = stat.value; + if mic_part > *counter { + *counter = mic_part; } let counter = stats.entry("maxwait").or_insert(0); @@ -320,6 +338,7 @@ impl Collector { } EventName::FlushStatsToStatsD => { + // Calculate connection states for (_, state) in &client_server_states { match state { EventName::ClientActive => { @@ -361,8 +380,26 @@ impl Collector { }; } - info!("{:?}", stats); + // Calculate averages + for stat in &[ + "avg_query_count", + "avgxact_count", + "avg_sent", + "avg_received", + "avg_wait_time", + ] { + let total_name = stat.replace("avg_", "total_"); + let old_value = old_stats.entry(total_name.clone()).or_insert(0); + let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); + let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second + stats.insert(stat, avg); + *old_value = new_value; + } + + debug!("{:?}", stats); + + // Update latest stats used in SHOW STATS match LATEST_STATS.get() { Some(arc) => { let mut guard = arc.lock().unwrap(); @@ -376,9 +413,24 @@ impl Collector { let mut pipeline = self.client.pipeline(); - for (key, value) in stats.iter_mut() { + for (key, value) in stats.iter() { pipeline.gauge(key, *value as f64); - *value = 0; + } + + // These are re-calculated every iteration of the loop, so we don't want to add values + // from the last iteration. + for stat in &[ + "cl_active", + "cl_waiting", + "cl_idle", + "sv_idle", + "sv_active", + "sv_tested", + "sv_login", + "maxwait", + "maxwait_us", + ] { + stats.insert(stat, 0); } pipeline.send(&self.client);