From 341ebf412343499e534f6c229e12512664f20e9f Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 7 Mar 2022 23:05:40 -0800 Subject: [PATCH] docs and remove Option (#58) * docs and remove Option * lint --- src/client.rs | 10 ++--- src/pool.rs | 8 ++-- src/server.rs | 6 ++- src/stats.rs | 119 ++++++++++++++++++++++++++++++++++++++------------ 4 files changed, 104 insertions(+), 39 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0fc9c47..dafc97d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -407,12 +407,12 @@ impl Client { } // Report query executed statistics. - self.stats.query(address.id); + self.stats.query(self.process_id, address.id); // The transaction is over, we can release the connection back to the pool. if !server.in_transaction() { // Report transaction executed statistics. - self.stats.transaction(address.id); + self.stats.transaction(self.process_id, address.id); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -493,12 +493,12 @@ impl Client { } // Report query executed statistics. - self.stats.query(address.id); + self.stats.query(self.process_id, address.id); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if !server.in_transaction() { - self.stats.transaction(address.id); + self.stats.transaction(self.process_id, address.id); if self.transaction_mode { self.stats.server_idle(server.process_id(), address.id); @@ -532,7 +532,7 @@ impl Client { // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if !server.in_transaction() { - self.stats.transaction(address.id); + self.stats.transaction(self.process_id, address.id); if self.transaction_mode { self.stats.server_idle(server.process_id(), address.id); diff --git a/src/pool.rs b/src/pool.rs index b534842..046a681 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -234,7 +234,7 @@ impl ConnectionPool { self.ban(address, shard); self.stats.client_disconnecting(process_id, address.id); self.stats - .checkout_time(now.elapsed().as_micros(), address.id); + .checkout_time(now.elapsed().as_micros(), process_id, address.id); continue; } }; @@ -255,7 +255,7 @@ impl ConnectionPool { Ok(res) => match res { Ok(_) => { self.stats - .checkout_time(now.elapsed().as_micros(), address.id); + .checkout_time(now.elapsed().as_micros(), process_id, address.id); self.stats.server_idle(conn.process_id(), address.id); return Ok((conn, address.clone())); } @@ -267,7 +267,7 @@ impl ConnectionPool { self.ban(address, shard); self.stats.client_disconnecting(process_id, address.id); self.stats - .checkout_time(now.elapsed().as_micros(), address.id); + .checkout_time(now.elapsed().as_micros(), process_id, address.id); continue; } }, @@ -280,7 +280,7 @@ impl ConnectionPool { self.ban(address, shard); self.stats.client_disconnecting(process_id, address.id); self.stats - .checkout_time(now.elapsed().as_micros(), address.id); + .checkout_time(now.elapsed().as_micros(), process_id, address.id); continue; } } diff --git a/src/server.rs b/src/server.rs index 4ef9422..aaab47c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -268,7 +268,8 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { - self.stats.data_sent(messages.len(), self.address.id); + self.stats + .data_sent(messages.len(), self.process_id, self.address.id); match write_all_half(&mut self.write, messages).await { Ok(_) => Ok(()), @@ -374,7 +375,8 @@ impl Server { let bytes = self.buffer.clone(); // Keep track of how much data we got from the server for stats. - self.stats.data_received(bytes.len(), self.address.id); + self.stats + .data_received(bytes.len(), self.process_id, self.address.id); // Clear the buffer for next query. self.buffer.clear(); diff --git a/src/stats.rs b/src/stats.rs index 7d4b025..1b7ccda 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,15 +1,20 @@ +/// Statistics and reporting. use log::info; use once_cell::sync::Lazy; use parking_lot::Mutex; +use std::collections::HashMap; use tokio::sync::mpsc::{Receiver, Sender}; -use std::collections::HashMap; - -// Stats used in SHOW STATS +// Latest stats updated every second; used in SHOW STATS and other admin commands. static LATEST_STATS: Lazy>>> = Lazy::new(|| Mutex::new(HashMap::new())); -static STAT_PERIOD: u64 = 15000; //15 seconds +// Statistics period used for average calculations. +// 15 seconds. +static STAT_PERIOD: u64 = 15000; + +/// The names for the events reported +/// to the statistics collector. #[derive(Debug, Clone, Copy)] enum EventName { CheckoutTime, @@ -30,172 +35,218 @@ enum EventName { UpdateAverages, } +/// Event data sent to the collector +/// from clients and servers. #[derive(Debug)] pub struct Event { + /// The name of the event being reported. name: EventName, + + /// The value being reported. Meaning differs based on event name. value: i64, - process_id: Option, + + /// The client or server connection reporting the event. + process_id: i32, + + /// The server the client is connected to. address_id: usize, } +/// The statistics reporter. An instance is given +/// to each possible source of statistics, +/// e.g. clients, servers, connection pool. #[derive(Clone, Debug)] pub struct Reporter { tx: Sender, } impl Reporter { + /// Create a new Reporter instance. pub fn new(tx: Sender) -> Reporter { Reporter { tx: tx } } - pub fn query(&self, address_id: usize) { + /// Report a query executed by a client against + /// a server identified by the `address_id`. + pub fn query(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::Query, value: 1, - process_id: None, + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn transaction(&self, address_id: usize) { + /// Report a transaction executed by a client against + /// a server identified by the `address_id`. + pub fn transaction(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::Transaction, value: 1, - process_id: None, + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn data_sent(&self, amount: usize, address_id: usize) { + /// Report data sent to a server identified by `address_id`. + /// The `amount` is measured in bytes. + pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) { let event = Event { name: EventName::DataSent, value: amount as i64, - process_id: None, + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn data_received(&self, amount: usize, address_id: usize) { + /// Report data received from a server identified by `address_id`. + /// The `amount` is measured in bytes. + pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) { let event = Event { name: EventName::DataReceived, value: amount as i64, - process_id: None, + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn checkout_time(&self, ms: u128, address_id: usize) { + /// Time spent waiting to get a healthy connection from the pool + /// for a server identified by `address_id`. + /// Measured in milliseconds. + pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) { let event = Event { name: EventName::CheckoutTime, value: ms as i64, - process_id: None, + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a client identified by `process_id` waiting for a connection + /// to a server identified by `address_id`. pub fn client_waiting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientWaiting, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a client identified by `process_id` is done waiting for a connection + /// to a server identified by `address_id` and is about to query the server. pub fn client_active(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientActive, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a client identified by `process_id` is done querying the server + /// identified by `address_id` and is no longer active. pub fn client_idle(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientIdle, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a client identified by `process_id` is disconecting from the pooler. + /// The last server it was connected to is identified by `address_id`. pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientDisconnecting, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a server connection identified by `process_id` for + /// a configured server identified by `address_id` is actively used + /// by a client. pub fn server_active(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerActive, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a server connection identified by `process_id` for + /// a configured server identified by `address_id` is no longer + /// actively used by a client and is now idle. pub fn server_idle(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerIdle, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a server connection identified by `process_id` for + /// a configured server identified by `address_id` is attempting + /// to login. pub fn server_login(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerLogin, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a server connection identified by `process_id` for + /// a configured server identified by `address_id` is being + /// tested before being given to a client. pub fn server_tested(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerTested, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; let _ = self.tx.try_send(event); } + /// Reports a server connection identified by `process_id` is disconecting from the pooler. + /// The configured server it was connected to is identified by `address_id`. pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerDisconnecting, value: 1, - process_id: Some(process_id), + process_id: process_id, address_id: address_id, }; @@ -203,16 +254,26 @@ impl Reporter { } } +/// The statistics collector which is receiving statistics +/// from clients, servers, and the connection pool. There is +/// only one collector (kind of like a singleton). +/// The collector can trigger events on its own, e.g. +/// it updates aggregates every second and averages every +/// 15 seconds. pub struct Collector { rx: Receiver, tx: Sender, } impl Collector { + /// Create a new collector instance. There should only be one instance + /// at a time. This is ensured by mpsc which allows only one receiver. pub fn new(rx: Receiver, tx: Sender) -> Collector { Collector { rx, tx } } + /// The statistics collection handler. It will collect statistics + /// for `address_id`s starting at 0 up to `addresses`. pub async fn collect(&mut self, addresses: usize) { info!("Events reporter started"); @@ -261,7 +322,7 @@ impl Collector { let _ = tx.try_send(Event { name: EventName::UpdateStats, value: 0, - process_id: None, + process_id: -1, address_id: address_id, }); } @@ -278,7 +339,7 @@ impl Collector { let _ = tx.try_send(Event { name: EventName::UpdateAverages, value: 0, - process_id: None, + process_id: -1, address_id: address_id, }); } @@ -352,11 +413,11 @@ impl Collector { | EventName::ServerIdle | EventName::ServerTested | EventName::ServerLogin => { - client_server_states.insert(stat.process_id.unwrap(), stat.name); + client_server_states.insert(stat.process_id, stat.name); } EventName::ClientDisconnecting | EventName::ServerDisconnecting => { - client_server_states.remove(&stat.process_id.unwrap()); + client_server_states.remove(&stat.process_id); } EventName::UpdateStats => { @@ -449,6 +510,8 @@ impl Collector { } } +/// Get a snapshot of statistics. Updated once a second +/// by the `Collector`. pub fn get_stats() -> HashMap> { LATEST_STATS.lock().clone() }