diff --git a/src/pool.rs b/src/pool.rs index 6c9a7a2..fb33fd8 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -351,7 +351,12 @@ impl ManageConnection for ServerPool { self.address, self.user.name ); - Server::startup( + // Put a temporary process_id into the stats + // for server login. + let process_id = rand::random::(); + self.stats.server_login(process_id); + + match Server::startup( &self.address, &self.user, &self.database, @@ -359,6 +364,18 @@ impl ManageConnection for ServerPool { self.stats.clone(), ) .await + { + Ok(conn) => { + // Remove the temporary process_id from the stats. + self.stats.server_disconnecting(process_id); + Ok(conn) + } + Err(err) => { + // Remove the temporary process_id from the stats. + self.stats.server_disconnecting(process_id); + Err(err) + } + } } /// Determines if the connection is still connected to the database. diff --git a/src/stats.rs b/src/stats.rs index beb430f..0332b5f 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,6 +1,6 @@ use log::info; use statsd::Client; -/// Statistics collector and publisher. +/// Events collector and publisher. use tokio::sync::mpsc::{Receiver, Sender}; use std::collections::HashMap; @@ -9,12 +9,10 @@ use std::time::Instant; use crate::config::get_config; #[derive(Debug, Clone, Copy)] -pub enum StatisticName { +enum EventName { CheckoutTime, - //QueryRuntime, - //TransactionTime, - Queries, - Transactions, + Query, + Transaction, DataSent, DataReceived, ClientWaiting, @@ -29,25 +27,25 @@ pub enum StatisticName { } #[derive(Debug)] -pub struct Statistic { - pub name: StatisticName, - pub value: i64, - pub process_id: Option, +pub struct Event { + name: EventName, + value: i64, + process_id: Option, } #[derive(Clone, Debug)] pub struct Reporter { - tx: Sender, + tx: Sender, } impl Reporter { - pub fn new(tx: Sender) -> Reporter { + pub fn new(tx: Sender) -> Reporter { Reporter { tx: tx } } - pub fn query(&mut self) { - let statistic = Statistic { - name: StatisticName::Queries, + pub fn query(&self) { + let statistic = Event { + name: EventName::Query, value: 1, process_id: None, }; @@ -55,9 +53,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn transaction(&mut self) { - let statistic = Statistic { - name: StatisticName::Transactions, + pub fn transaction(&self) { + let statistic = Event { + name: EventName::Transaction, value: 1, process_id: None, }; @@ -65,9 +63,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn data_sent(&mut self, amount: usize) { - let statistic = Statistic { - name: StatisticName::DataSent, + pub fn data_sent(&self, amount: usize) { + let statistic = Event { + name: EventName::DataSent, value: amount as i64, process_id: None, }; @@ -75,9 +73,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn data_received(&mut self, amount: usize) { - let statistic = Statistic { - name: StatisticName::DataReceived, + pub fn data_received(&self, amount: usize) { + let statistic = Event { + name: EventName::DataReceived, value: amount as i64, process_id: None, }; @@ -85,9 +83,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn checkout_time(&mut self, ms: u128) { - let statistic = Statistic { - name: StatisticName::CheckoutTime, + pub fn checkout_time(&self, ms: u128) { + let statistic = Event { + name: EventName::CheckoutTime, value: ms as i64, process_id: None, }; @@ -95,9 +93,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn client_waiting(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ClientWaiting, + pub fn client_waiting(&self, process_id: i32) { + let statistic = Event { + name: EventName::ClientWaiting, value: 1, process_id: Some(process_id), }; @@ -105,9 +103,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn client_active(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ClientActive, + pub fn client_active(&self, process_id: i32) { + let statistic = Event { + name: EventName::ClientActive, value: 1, process_id: Some(process_id), }; @@ -115,9 +113,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn client_idle(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ClientIdle, + pub fn client_idle(&self, process_id: i32) { + let statistic = Event { + name: EventName::ClientIdle, value: 1, process_id: Some(process_id), }; @@ -125,9 +123,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn client_disconnecting(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ClientDisconnecting, + pub fn client_disconnecting(&self, process_id: i32) { + let statistic = Event { + name: EventName::ClientDisconnecting, value: 1, process_id: Some(process_id), }; @@ -135,9 +133,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn server_active(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ServerActive, + pub fn server_active(&self, process_id: i32) { + let statistic = Event { + name: EventName::ServerActive, value: 1, process_id: Some(process_id), }; @@ -145,9 +143,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn server_idle(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ServerIdle, + pub fn server_idle(&self, process_id: i32) { + let statistic = Event { + name: EventName::ServerIdle, value: 1, process_id: Some(process_id), }; @@ -155,9 +153,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn server_login(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ServerLogin, + pub fn server_login(&self, process_id: i32) { + let statistic = Event { + name: EventName::ServerLogin, value: 1, process_id: Some(process_id), }; @@ -165,9 +163,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn server_tested(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ServerTested, + pub fn server_tested(&self, process_id: i32) { + let statistic = Event { + name: EventName::ServerTested, value: 1, process_id: Some(process_id), }; @@ -175,9 +173,9 @@ impl Reporter { let _ = self.tx.try_send(statistic); } - pub fn server_disconnecting(&mut self, process_id: i32) { - let statistic = Statistic { - name: StatisticName::ServerDisconnecting, + pub fn server_disconnecting(&self, process_id: i32) { + let statistic = Event { + name: EventName::ServerDisconnecting, value: 1, process_id: Some(process_id), }; @@ -187,12 +185,12 @@ impl Reporter { } pub struct Collector { - rx: Receiver, + rx: Receiver, client: Client, } impl Collector { - pub fn new(rx: Receiver) -> Collector { + pub fn new(rx: Receiver) -> Collector { Collector { rx: rx, client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(), @@ -200,7 +198,7 @@ impl Collector { } pub async fn collect(&mut self) { - info!("Statistics reporter started"); + info!("Events reporter started"); let mut stats = HashMap::from([ ("total_query_count", 0), @@ -219,7 +217,7 @@ impl Collector { ("sv_tested", 0), ]); - let mut client_server_states: HashMap = HashMap::new(); + let mut client_server_states: HashMap = HashMap::new(); let mut now = Instant::now(); @@ -227,34 +225,34 @@ impl Collector { let stat = match self.rx.recv().await { Some(stat) => stat, None => { - info!("Statistics collector is shutting down"); + info!("Events collector is shutting down"); return; } }; // Some are counters, some are gauges... match stat.name { - StatisticName::Queries => { + EventName::Query => { let counter = stats.entry("total_query_count").or_insert(0); *counter += stat.value; } - StatisticName::Transactions => { + EventName::Transaction => { let counter = stats.entry("total_xact_count").or_insert(0); *counter += stat.value; } - StatisticName::DataSent => { + EventName::DataSent => { let counter = stats.entry("total_sent").or_insert(0); *counter += stat.value; } - StatisticName::DataReceived => { + EventName::DataReceived => { let counter = stats.entry("total_received").or_insert(0); *counter += stat.value; } - StatisticName::CheckoutTime => { + EventName::CheckoutTime => { let counter = stats.entry("total_wait_time").or_insert(0); *counter += stat.value; @@ -273,17 +271,17 @@ impl Collector { } } - StatisticName::ClientActive - | StatisticName::ClientWaiting - | StatisticName::ClientIdle - | StatisticName::ServerActive - | StatisticName::ServerIdle - | StatisticName::ServerTested - | StatisticName::ServerLogin => { + EventName::ClientActive + | EventName::ClientWaiting + | EventName::ClientIdle + | EventName::ServerActive + | EventName::ServerIdle + | EventName::ServerTested + | EventName::ServerLogin => { client_server_states.insert(stat.process_id.unwrap(), stat.name); } - StatisticName::ClientDisconnecting | StatisticName::ServerDisconnecting => { + EventName::ClientDisconnecting | EventName::ServerDisconnecting => { client_server_states.remove(&stat.process_id.unwrap()); } }; @@ -293,37 +291,37 @@ impl Collector { if now.elapsed().as_secs() > 15 { for (_, state) in &client_server_states { match state { - StatisticName::ClientActive => { + EventName::ClientActive => { let counter = stats.entry("cl_active").or_insert(0); *counter += 1; } - StatisticName::ClientWaiting => { + EventName::ClientWaiting => { let counter = stats.entry("cl_waiting").or_insert(0); *counter += 1; } - StatisticName::ClientIdle => { + EventName::ClientIdle => { let counter = stats.entry("cl_idle").or_insert(0); *counter += 1; } - StatisticName::ServerIdle => { + EventName::ServerIdle => { let counter = stats.entry("sv_idle").or_insert(0); *counter += 1; } - StatisticName::ServerActive => { + EventName::ServerActive => { let counter = stats.entry("sv_active").or_insert(0); *counter += 1; } - StatisticName::ServerTested => { + EventName::ServerTested => { let counter = stats.entry("sv_tested").or_insert(0); *counter += 1; } - StatisticName::ServerLogin => { + EventName::ServerLogin => { let counter = stats.entry("sv_login").or_insert(0); *counter += 1; }