mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 10:46:30 +00:00
better avg calc
This commit is contained in:
33
src/stats.rs
33
src/stats.rs
@@ -1,9 +1,10 @@
|
|||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
/// Statistics and reporting.
|
/// Statistics and reporting.
|
||||||
use log::{error, info, trace};
|
use log::{error, info, trace, warn};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::time::SystemTime;
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
|
||||||
@@ -346,6 +347,13 @@ impl Collector {
|
|||||||
// Track which state the client and server are at any given time.
|
// Track which state the client and server are at any given time.
|
||||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||||
|
|
||||||
|
// Average update times
|
||||||
|
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
|
||||||
|
|
||||||
|
for address_id in 0..get_number_of_addresses() {
|
||||||
|
last_updated_avg.insert(address_id, SystemTime::now());
|
||||||
|
}
|
||||||
|
|
||||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
@@ -406,6 +414,9 @@ impl Collector {
|
|||||||
.entry(stat.address_id)
|
.entry(stat.address_id)
|
||||||
.or_insert(HashMap::new());
|
.or_insert(HashMap::new());
|
||||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||||
|
let last_updated_avg = last_updated_avg
|
||||||
|
.entry(stat.address_id)
|
||||||
|
.or_insert(SystemTime::now());
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
@@ -531,6 +542,24 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
EventName::UpdateAverages => {
|
EventName::UpdateAverages => {
|
||||||
|
let elapsed = match last_updated_avg.elapsed() {
|
||||||
|
Ok(elapsed) => elapsed.as_secs(),
|
||||||
|
Err(err) => {
|
||||||
|
error!(
|
||||||
|
"Could not get elapsed time, averages may be incorrect: {:?}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
STAT_PERIOD / 1_000
|
||||||
|
}
|
||||||
|
} as i64;
|
||||||
|
|
||||||
|
*last_updated_avg = SystemTime::now();
|
||||||
|
|
||||||
|
// Tokio triggers the interval on first tick and then sleeps.
|
||||||
|
if elapsed == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate averages
|
// Calculate averages
|
||||||
for stat in &[
|
for stat in &[
|
||||||
"avg_query_count",
|
"avg_query_count",
|
||||||
@@ -548,7 +577,7 @@ impl Collector {
|
|||||||
|
|
||||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
let avg = (new_value - *old_value) / elapsed; // Avg / second
|
||||||
|
|
||||||
stats.insert(stat, avg);
|
stats.insert(stat, avg);
|
||||||
*old_value = new_value;
|
*old_value = new_value;
|
||||||
|
|||||||
Reference in New Issue
Block a user