diff --git a/src/client.rs b/src/client.rs index 1b921b8..7a11d60 100644 --- a/src/client.rs +++ b/src/client.rs @@ -192,6 +192,9 @@ impl Client { // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocols. loop { + // Client idle, waiting for messages. + self.stats.client_idle(self.process_id); + // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). // We can parse it here before grabbing a server from the pool, @@ -243,6 +246,9 @@ impl Client { continue; } + // Waiting for server connection. + self.stats.client_waiting(self.process_id); + // Grab a server from the pool: the client issued a regular query. let connection = match pool.get(query_router.shard(), query_router.role()).await { Ok(conn) => conn, @@ -261,6 +267,9 @@ impl Client { // Claim this server as mine for query cancellation. server.claim(self.process_id, self.secret_key); + // Client active + self.stats.client_active(self.process_id); + // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. @@ -330,7 +339,6 @@ impl Client { // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { // Report this client as idle. - self.stats.client_idle(); break; } } @@ -412,7 +420,6 @@ impl Client { self.stats.transaction(); if self.transaction_mode { - self.stats.client_idle(); break; } } @@ -446,7 +453,6 @@ impl Client { self.stats.transaction(); if self.transaction_mode { - self.stats.client_idle(); break; } } @@ -471,3 +477,9 @@ impl Client { guard.remove(&(self.process_id, self.secret_key)); } } + +impl Drop for Client { + fn drop(&mut self) { + self.stats.client_disconnecting(self.process_id); + } +} diff --git a/src/pool.rs b/src/pool.rs index 29ee40b..bea260b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -147,10 +147,6 @@ impl ConnectionPool { role: Option, ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); - - // We are waiting for a server now. - self.stats.client_waiting(); - let addresses = &self.addresses[shard]; let mut allowed_attempts = match role { @@ -222,7 +218,6 @@ impl ConnectionPool { Ok(res) => match res { Ok(_) => { self.stats.checkout_time(now.elapsed().as_micros()); - self.stats.client_active(); return Ok((conn, address.clone())); } Err(_) => { diff --git a/src/stats.rs b/src/stats.rs index f3e0790..e74c068 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -7,7 +7,7 @@ use std::time::Instant; use crate::config::get_config; -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum StatisticName { CheckoutTime, //QueryRuntime, @@ -16,15 +16,17 @@ pub enum StatisticName { Transactions, DataSent, DataReceived, - ClientsWaiting, - ClientsActive, - ClientsIdle, + ClientWaiting, + ClientActive, + ClientIdle, + ClientDisconnecting, } #[derive(Debug)] pub struct Statistic { pub name: StatisticName, pub value: i64, + pub process_id: Option, } #[derive(Clone, Debug)] @@ -41,6 +43,7 @@ impl Reporter { let statistic = Statistic { name: StatisticName::Queries, value: 1, + process_id: None, }; let _ = self.tx.try_send(statistic); @@ -50,6 +53,7 @@ impl Reporter { let statistic = Statistic { name: StatisticName::Transactions, value: 1, + process_id: None, }; let _ = self.tx.try_send(statistic); @@ -59,6 +63,7 @@ impl Reporter { let statistic = Statistic { name: StatisticName::DataSent, value: amount as i64, + process_id: None, }; let _ = self.tx.try_send(statistic); @@ -68,6 +73,7 @@ impl Reporter { let statistic = Statistic { name: StatisticName::DataReceived, value: amount as i64, + process_id: None, }; let _ = self.tx.try_send(statistic); @@ -77,54 +83,48 @@ impl Reporter { let statistic = Statistic { name: StatisticName::CheckoutTime, value: ms as i64, + process_id: None, }; let _ = self.tx.try_send(statistic); } - pub fn client_waiting(&mut self) { + pub fn client_waiting(&mut self, process_id: i32) { let statistic = Statistic { - name: StatisticName::ClientsWaiting, + name: StatisticName::ClientWaiting, value: 1, - }; - - let _ = self.tx.try_send(statistic); - - let statistic = Statistic { - name: StatisticName::ClientsIdle, - value: -1, + process_id: Some(process_id), }; let _ = self.tx.try_send(statistic); } - pub fn client_active(&mut self) { - let statistic = Statistic { - name: StatisticName::ClientsWaiting, - value: -1, - }; - - let _ = self.tx.try_send(statistic); + pub fn client_active(&mut self, process_id: i32) { let statistic = Statistic { - name: StatisticName::ClientsActive, + name: StatisticName::ClientActive, value: 1, + process_id: Some(process_id), }; let _ = self.tx.try_send(statistic); } - pub fn client_idle(&mut self) { + pub fn client_idle(&mut self, process_id: i32) { let statistic = Statistic { - name: StatisticName::ClientsActive, - value: -1, + name: StatisticName::ClientIdle, + value: 1, + process_id: Some(process_id), }; let _ = self.tx.try_send(statistic); + } + pub fn client_disconnecting(&mut self, process_id: i32) { let statistic = Statistic { - name: StatisticName::ClientsIdle, + name: StatisticName::ClientDisconnecting, value: 1, + process_id: Some(process_id), }; let _ = self.tx.try_send(statistic); @@ -158,6 +158,8 @@ impl Collector { ("cl_idle", 0), ]); + let mut client_states: HashMap = HashMap::new(); + let mut now = Instant::now(); loop { @@ -210,32 +212,43 @@ impl Collector { } } - StatisticName::ClientsActive => { - let counter = stats.entry("cl_active").or_insert(0); - - *counter += stat.value; - *counter = std::cmp::max(*counter, 0); + StatisticName::ClientActive | StatisticName::ClientWaiting | StatisticName::ClientIdle => { + client_states.insert(stat.process_id.unwrap(), stat.name); } - StatisticName::ClientsWaiting => { - let counter = stats.entry("cl_waiting").or_insert(0); - *counter += stat.value; - *counter = std::cmp::max(*counter, 0); - } - - StatisticName::ClientsIdle => { - let counter = stats.entry("cl_idle").or_insert(0); - *counter += stat.value; - *counter = std::cmp::max(*counter, 0); + StatisticName::ClientDisconnecting => { + client_states.remove(&stat.process_id.unwrap()); } }; + // It's been 15 seconds. If there is no traffic, it won't publish anything, // but it also doesn't matter then. if now.elapsed().as_secs() > 15 { - let mut pipeline = self.client.pipeline(); + for (_, state) in &client_states { + match state { + StatisticName::ClientActive => { + let counter = stats.entry("cl_active").or_insert(0); + *counter += 1; + } - println!(">> Publishing statistics to StatsD: {:?}", stats); + StatisticName::ClientWaiting => { + let counter = stats.entry("cl_waiting").or_insert(0); + *counter += 1; + } + + StatisticName::ClientIdle => { + let counter = stats.entry("cl_idle").or_insert(0); + *counter += 1; + } + + _ => unreachable!(), + }; + } + + println!(">> Reporting to StatsD: {:?}", stats); + + let mut pipeline = self.client.pipeline(); for (key, value) in stats.iter_mut() { pipeline.gauge(key, *value as f64);