Per-shard statistics (#57)

* per shard stats

* aight

* cleaner

* fix show lists

* comments

* more friendly

* case-insensitive

* test all shards

* ok

* HUH?
This commit is contained in:
Lev Kokotov
2022-03-04 17:04:27 -08:00
committed by GitHub
parent 1e8fa110ae
commit 35828a0a8c
12 changed files with 286 additions and 142 deletions

View File

@@ -1,15 +1,13 @@
use log::{debug, info};
use log::info;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use statsd::Client;
use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap;
use crate::config::get_config;
// Stats used in SHOW STATS
static LATEST_STATS: Lazy<Mutex<HashMap<String, i64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
static STAT_PERIOD: u64 = 15000; //15 seconds
#[derive(Debug, Clone, Copy)]
@@ -28,7 +26,8 @@ enum EventName {
ServerTested,
ServerLogin,
ServerDisconnecting,
FlushStatsToStatsD,
UpdateStats,
UpdateAverages,
}
#[derive(Debug)]
@@ -36,6 +35,7 @@ pub struct Event {
name: EventName,
value: i64,
process_id: Option<i32>,
address_id: usize,
}
#[derive(Clone, Debug)]
@@ -48,141 +48,155 @@ impl Reporter {
Reporter { tx: tx }
}
pub fn query(&self) {
pub fn query(&self, address_id: usize) {
let event = Event {
name: EventName::Query,
value: 1,
process_id: None,
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn transaction(&self) {
pub fn transaction(&self, address_id: usize) {
let event = Event {
name: EventName::Transaction,
value: 1,
process_id: None,
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn data_sent(&self, amount: usize) {
pub fn data_sent(&self, amount: usize, address_id: usize) {
let event = Event {
name: EventName::DataSent,
value: amount as i64,
process_id: None,
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn data_received(&self, amount: usize) {
pub fn data_received(&self, amount: usize, address_id: usize) {
let event = Event {
name: EventName::DataReceived,
value: amount as i64,
process_id: None,
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn checkout_time(&self, ms: u128) {
pub fn checkout_time(&self, ms: u128, address_id: usize) {
let event = Event {
name: EventName::CheckoutTime,
value: ms as i64,
process_id: None,
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn client_waiting(&self, process_id: i32) {
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientWaiting,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn client_active(&self, process_id: i32) {
pub fn client_active(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientActive,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn client_idle(&self, process_id: i32) {
pub fn client_idle(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientIdle,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn client_disconnecting(&self, process_id: i32) {
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientDisconnecting,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn server_active(&self, process_id: i32) {
pub fn server_active(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerActive,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn server_idle(&self, process_id: i32) {
pub fn server_idle(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerIdle,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn server_login(&self, process_id: i32) {
pub fn server_login(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerLogin,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn server_tested(&self, process_id: i32) {
pub fn server_tested(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerTested,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
}
pub fn server_disconnecting(&self, process_id: i32) {
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerDisconnecting,
value: 1,
process_id: Some(process_id),
address_id: address_id,
};
let _ = self.tx.try_send(event);
@@ -192,22 +206,17 @@ impl Reporter {
pub struct Collector {
rx: Receiver<Event>,
tx: Sender<Event>,
client: Client,
}
impl Collector {
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
Collector {
rx,
tx,
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
}
Collector { rx, tx }
}
pub async fn collect(&mut self) {
pub async fn collect(&mut self, addresses: usize) {
info!("Events reporter started");
let mut stats = HashMap::from([
let stats_template = HashMap::from([
("total_query_count", 0),
("total_xact_count", 0),
("total_sent", 0),
@@ -232,25 +241,47 @@ impl Collector {
("sv_tested", 0),
]);
let mut stats = HashMap::new();
// Stats saved after each iteration of the flush event. Used in calculation
// of averages in the last flush period.
let mut old_stats: HashMap<String, i64> = HashMap::new();
let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
// Track which state the client and server are at any given time.
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
// Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
loop {
interval.tick().await;
for address_id in 0..addresses {
let _ = tx.try_send(Event {
name: EventName::UpdateStats,
value: 0,
process_id: None,
address_id: address_id,
});
}
}
});
let tx = self.tx.clone();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
loop {
interval.tick().await;
let _ = tx.try_send(Event {
name: EventName::FlushStatsToStatsD,
value: 0,
process_id: None,
});
for address_id in 0..addresses {
let _ = tx.try_send(Event {
name: EventName::UpdateAverages,
value: 0,
process_id: None,
address_id: address_id,
});
}
}
});
@@ -264,6 +295,14 @@ impl Collector {
}
};
let stats = stats
.entry(stat.address_id)
.or_insert(stats_template.clone());
let client_server_states = client_server_states
.entry(stat.address_id)
.or_insert(HashMap::new());
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
// Some are counters, some are gauges...
match stat.name {
EventName::Query => {
@@ -320,9 +359,9 @@ impl Collector {
client_server_states.remove(&stat.process_id.unwrap());
}
EventName::FlushStatsToStatsD => {
EventName::UpdateStats => {
// Calculate connection states
for (_, state) in &client_server_states {
for (_, state) in client_server_states.iter() {
match state {
EventName::ClientActive => {
let counter = stats.entry("cl_active").or_insert(0);
@@ -334,11 +373,6 @@ impl Collector {
*counter += 1;
}
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
EventName::ServerIdle => {
let counter = stats.entry("sv_idle").or_insert(0);
*counter += 1;
@@ -359,39 +393,20 @@ impl Collector {
*counter += 1;
}
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
_ => unreachable!(),
};
}
// Calculate averages
for stat in &[
"avg_query_count",
"avgxact_count",
"avg_sent",
"avg_received",
"avg_wait_time",
] {
let total_name = stat.replace("avg_", "total_");
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;
}
debug!("{:?}", stats);
// Update latest stats used in SHOW STATS
let mut guard = LATEST_STATS.lock();
for (key, value) in &stats {
guard.insert(key.to_string(), value.clone());
}
let mut pipeline = self.client.pipeline();
for (key, value) in stats.iter() {
pipeline.gauge(key, *value as f64);
let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
entry.insert(key.to_string(), value.clone());
}
// These are re-calculated every iteration of the loop, so we don't want to add values
@@ -409,14 +424,31 @@ impl Collector {
] {
stats.insert(stat, 0);
}
}
pipeline.send(&self.client);
EventName::UpdateAverages => {
// Calculate averages
for stat in &[
"avg_query_count",
"avgxact_count",
"avg_sent",
"avg_received",
"avg_wait_time",
] {
let total_name = stat.replace("avg_", "total_");
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;
}
}
};
}
}
}
pub fn get_stats() -> HashMap<String, i64> {
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
LATEST_STATS.lock().clone()
}