docs and remove Option (#58)

* docs and remove Option

* lint
This commit is contained in:
Lev Kokotov
2022-03-07 23:05:40 -08:00
committed by GitHub
parent 35828a0a8c
commit 341ebf4123
4 changed files with 104 additions and 39 deletions

View File

@@ -407,12 +407,12 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(address.id); self.stats.query(self.process_id, address.id);
// The transaction is over, we can release the connection back to the pool. // The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(address.id); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
@@ -493,12 +493,12 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(address.id); self.stats.query(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(address.id); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id); self.stats.server_idle(server.process_id(), address.id);
@@ -532,7 +532,7 @@ impl Client {
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(address.id); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id); self.stats.server_idle(server.process_id(), address.id);

View File

@@ -234,7 +234,7 @@ impl ConnectionPool {
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id); self.stats.client_disconnecting(process_id, address.id);
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}; };
@@ -255,7 +255,7 @@ impl ConnectionPool {
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
@@ -267,7 +267,7 @@ impl ConnectionPool {
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id); self.stats.client_disconnecting(process_id, address.id);
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}, },
@@ -280,7 +280,7 @@ impl ConnectionPool {
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id); self.stats.client_disconnecting(process_id, address.id);
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
} }

View File

@@ -268,7 +268,8 @@ impl Server {
/// Send messages to the server from the client. /// Send messages to the server from the client.
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
self.stats.data_sent(messages.len(), self.address.id); self.stats
.data_sent(messages.len(), self.process_id, self.address.id);
match write_all_half(&mut self.write, messages).await { match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
@@ -374,7 +375,8 @@ impl Server {
let bytes = self.buffer.clone(); let bytes = self.buffer.clone();
// Keep track of how much data we got from the server for stats. // Keep track of how much data we got from the server for stats.
self.stats.data_received(bytes.len(), self.address.id); self.stats
.data_received(bytes.len(), self.process_id, self.address.id);
// Clear the buffer for next query. // Clear the buffer for next query.
self.buffer.clear(); self.buffer.clear();

View File

@@ -1,15 +1,20 @@
/// Statistics and reporting.
use log::info; use log::info;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap; // Latest stats updated every second; used in SHOW STATS and other admin commands.
// Stats used in SHOW STATS
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> = static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
static STAT_PERIOD: u64 = 15000; //15 seconds
// Statistics period used for average calculations.
// 15 seconds.
static STAT_PERIOD: u64 = 15000;
/// The names for the events reported
/// to the statistics collector.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum EventName { enum EventName {
CheckoutTime, CheckoutTime,
@@ -30,172 +35,218 @@ enum EventName {
UpdateAverages, UpdateAverages,
} }
/// Event data sent to the collector
/// from clients and servers.
#[derive(Debug)] #[derive(Debug)]
pub struct Event { pub struct Event {
/// The name of the event being reported.
name: EventName, name: EventName,
/// The value being reported. Meaning differs based on event name.
value: i64, value: i64,
process_id: Option<i32>,
/// The client or server connection reporting the event.
process_id: i32,
/// The server the client is connected to.
address_id: usize, address_id: usize,
} }
/// The statistics reporter. An instance is given
/// to each possible source of statistics,
/// e.g. clients, servers, connection pool.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Reporter { pub struct Reporter {
tx: Sender<Event>, tx: Sender<Event>,
} }
impl Reporter { impl Reporter {
/// Create a new Reporter instance.
pub fn new(tx: Sender<Event>) -> Reporter { pub fn new(tx: Sender<Event>) -> Reporter {
Reporter { tx: tx } Reporter { tx: tx }
} }
pub fn query(&self, address_id: usize) { /// Report a query executed by a client against
/// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::Query, name: EventName::Query,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn transaction(&self, address_id: usize) { /// Report a transaction executed by a client against
/// a server identified by the `address_id`.
pub fn transaction(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::Transaction, name: EventName::Transaction,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn data_sent(&self, amount: usize, address_id: usize) { /// Report data sent to a server identified by `address_id`.
/// The `amount` is measured in bytes.
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::DataSent, name: EventName::DataSent,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn data_received(&self, amount: usize, address_id: usize) { /// Report data received from a server identified by `address_id`.
/// The `amount` is measured in bytes.
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::DataReceived, name: EventName::DataReceived,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn checkout_time(&self, ms: u128, address_id: usize) { /// Time spent waiting to get a healthy connection from the pool
/// for a server identified by `address_id`.
/// Measured in milliseconds.
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::CheckoutTime, name: EventName::CheckoutTime,
value: ms as i64, value: ms as i64,
process_id: None, process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a client identified by `process_id` waiting for a connection
/// to a server identified by `address_id`.
pub fn client_waiting(&self, process_id: i32, address_id: usize) { pub fn client_waiting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientWaiting, name: EventName::ClientWaiting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a client identified by `process_id` is done waiting for a connection
/// to a server identified by `address_id` and is about to query the server.
pub fn client_active(&self, process_id: i32, address_id: usize) { pub fn client_active(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientActive, name: EventName::ClientActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a client identified by `process_id` is done querying the server
/// identified by `address_id` and is no longer active.
pub fn client_idle(&self, process_id: i32, address_id: usize) { pub fn client_idle(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientIdle, name: EventName::ClientIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a client identified by `process_id` is disconecting from the pooler.
/// The last server it was connected to is identified by `address_id`.
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientDisconnecting, name: EventName::ClientDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is actively used
/// by a client.
pub fn server_active(&self, process_id: i32, address_id: usize) { pub fn server_active(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerActive, name: EventName::ServerActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is no longer
/// actively used by a client and is now idle.
pub fn server_idle(&self, process_id: i32, address_id: usize) { pub fn server_idle(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerIdle, name: EventName::ServerIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is attempting
/// to login.
pub fn server_login(&self, process_id: i32, address_id: usize) { pub fn server_login(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerLogin, name: EventName::ServerLogin,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is being
/// tested before being given to a client.
pub fn server_tested(&self, process_id: i32, address_id: usize) { pub fn server_tested(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerTested, name: EventName::ServerTested,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
/// The configured server it was connected to is identified by `address_id`.
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerDisconnecting, name: EventName::ServerDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id, address_id: address_id,
}; };
@@ -203,16 +254,26 @@ impl Reporter {
} }
} }
/// The statistics collector which is receiving statistics
/// from clients, servers, and the connection pool. There is
/// only one collector (kind of like a singleton).
/// The collector can trigger events on its own, e.g.
/// it updates aggregates every second and averages every
/// 15 seconds.
pub struct Collector { pub struct Collector {
rx: Receiver<Event>, rx: Receiver<Event>,
tx: Sender<Event>, tx: Sender<Event>,
} }
impl Collector { impl Collector {
/// Create a new collector instance. There should only be one instance
/// at a time. This is ensured by mpsc which allows only one receiver.
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector { pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
Collector { rx, tx } Collector { rx, tx }
} }
/// The statistics collection handler. It will collect statistics
/// for `address_id`s starting at 0 up to `addresses`.
pub async fn collect(&mut self, addresses: usize) { pub async fn collect(&mut self, addresses: usize) {
info!("Events reporter started"); info!("Events reporter started");
@@ -261,7 +322,7 @@ impl Collector {
let _ = tx.try_send(Event { let _ = tx.try_send(Event {
name: EventName::UpdateStats, name: EventName::UpdateStats,
value: 0, value: 0,
process_id: None, process_id: -1,
address_id: address_id, address_id: address_id,
}); });
} }
@@ -278,7 +339,7 @@ impl Collector {
let _ = tx.try_send(Event { let _ = tx.try_send(Event {
name: EventName::UpdateAverages, name: EventName::UpdateAverages,
value: 0, value: 0,
process_id: None, process_id: -1,
address_id: address_id, address_id: address_id,
}); });
} }
@@ -352,11 +413,11 @@ impl Collector {
| EventName::ServerIdle | EventName::ServerIdle
| EventName::ServerTested | EventName::ServerTested
| EventName::ServerLogin => { | EventName::ServerLogin => {
client_server_states.insert(stat.process_id.unwrap(), stat.name); client_server_states.insert(stat.process_id, stat.name);
} }
EventName::ClientDisconnecting | EventName::ServerDisconnecting => { EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
client_server_states.remove(&stat.process_id.unwrap()); client_server_states.remove(&stat.process_id);
} }
EventName::UpdateStats => { EventName::UpdateStats => {
@@ -449,6 +510,8 @@ impl Collector {
} }
} }
/// Get a snapshot of statistics. Updated once a second
/// by the `Collector`.
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> { pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
LATEST_STATS.lock().clone() LATEST_STATS.lock().clone()
} }