From 8e5e28a1395688a8f7a17f118e7ebaa990c3a043 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 14 Feb 2022 10:00:55 -0800 Subject: [PATCH] Some stats (#19) --- Cargo.lock | 71 ++++++++++++++++++++-- Cargo.toml | 1 + src/client.rs | 53 +++++++++++----- src/main.rs | 29 ++++++++- src/pool.rs | 22 ++++++- src/server.rs | 11 +++- src/stats.rs | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 325 insertions(+), 25 deletions(-) create mode 100644 src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 1f18149..5ddda15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "futures-channel" version = "0.3.19" @@ -325,11 +331,12 @@ dependencies = [ "md-5", "num_cpus", "once_cell", - "rand", + "rand 0.8.4", "regex", "serde", "serde_derive", "sha-1", + "statsd", "tokio", "toml", ] @@ -370,6 +377,29 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.4" @@ -378,7 +408,7 @@ checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.3", "rand_hc", ] @@ -389,9 +419,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.3", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.3" @@ -407,7 +452,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" dependencies = [ - "rand_core", + "rand_core 0.6.3", +] + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", ] [[package]] @@ -491,6 +545,15 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "statsd" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5181f817969f3b9f76f70866611c0555b5537f8fdbf14cbdadebb9b54155bd5a" +dependencies = [ + "rand 0.3.23", +] + [[package]] name = "syn" version = "1.0.86" diff --git a/Cargo.toml b/Cargo.toml index 6db820c..8e89b49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,4 @@ serde_derive = "1" regex = "1" num_cpus = "1" once_cell = "1" +statsd = "0.14" diff --git a/src/client.rs b/src/client.rs index 43b3e63..cbb9872 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,6 +16,7 @@ use crate::messages::*; use crate::pool::{ClientServerMap, ConnectionPool}; use crate::server::Server; use crate::sharding::Sharder; +use crate::stats::Reporter; pub const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';"; pub const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)';"; @@ -57,6 +58,9 @@ pub struct Client { // Client parameters, e.g. user, client_encoding, etc. parameters: HashMap, + + // Statistics + stats: Reporter, } impl Client { @@ -69,6 +73,7 @@ impl Client { transaction_mode: bool, default_server_role: Option, server_info: BytesMut, + stats: Reporter, ) -> Result { loop { // Could be StartupMessage or SSLRequest @@ -127,6 +132,7 @@ impl Client { client_server_map: client_server_map, default_server_role: default_server_role, parameters: parameters, + stats: stats, }); } @@ -148,6 +154,7 @@ impl Client { client_server_map: client_server_map, default_server_role: default_server_role, parameters: HashMap::new(), + stats: stats, }); } @@ -220,7 +227,6 @@ impl Client { }; // Grab a server from the pool. - // None = any shard let connection = match pool.get(shard, role).await { Ok(conn) => conn, Err(err) => { @@ -287,11 +293,19 @@ impl Client { } } - // Release server - if !server.in_transaction() && self.transaction_mode { - shard = None; - role = self.default_server_role; - break; + // Send statistic + self.stats.query(); + + // Transaction over + if !server.in_transaction() { + self.stats.transaction(); + + // Release server + if self.transaction_mode { + shard = None; + role = self.default_server_role; + break; + } } } @@ -350,11 +364,17 @@ impl Client { } } + self.stats.query(); + // Release server - if !server.in_transaction() && self.transaction_mode { - shard = None; - role = self.default_server_role; - break; + if !server.in_transaction() { + self.stats.transaction(); + + if self.transaction_mode { + shard = None; + role = self.default_server_role; + break; + } } } @@ -378,11 +398,14 @@ impl Client { }; // Release the server - if !server.in_transaction() && self.transaction_mode { - println!("Releasing after copy done"); - shard = None; - role = self.default_server_role; - break; + if !server.in_transaction() { + self.stats.transaction(); + + if self.transaction_mode { + shard = None; + role = self.default_server_role; + break; + } } } diff --git a/src/main.rs b/src/main.rs index 29b0a2c..b4d51ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,7 @@ extern crate num_cpus; extern crate once_cell; extern crate serde; extern crate serde_derive; +extern crate statsd; extern crate tokio; extern crate toml; @@ -30,6 +31,7 @@ use tokio::signal; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; mod client; mod config; @@ -38,11 +40,13 @@ mod messages; mod pool; mod server; mod sharding; +mod stats; // Support for query cancellation: this maps our process_ids and // secret keys to the backend's. use config::Role; use pool::{ClientServerMap, ConnectionPool}; +use stats::{Collector, Reporter}; /// Main! #[tokio::main(worker_threads = 4)] @@ -87,7 +91,23 @@ async fn main() { ); println!("> Connection timeout: {}ms", config.general.connect_timeout); - let mut pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await; + // Collect statistics and send them to StatsD + let (tx, rx) = mpsc::channel(100); + + tokio::task::spawn(async move { + println!("> Statistics reporter started"); + + let mut stats_collector = Collector::new(rx); + stats_collector.collect().await; + }); + + let mut pool = ConnectionPool::from_config( + config.clone(), + client_server_map.clone(), + Reporter::new(tx.clone()), + ) + .await; + let transaction_mode = config.general.pool_mode == "transaction"; let default_server_role = match config.query_router.default_role.as_ref() { "any" => None, @@ -115,6 +135,7 @@ async fn main() { let pool = pool.clone(); let client_server_map = client_server_map.clone(); let server_info = server_info.clone(); + let reporter = Reporter::new(tx.clone()); let (socket, addr) = match listener.accept().await { Ok((socket, addr)) => (socket, addr), @@ -136,6 +157,7 @@ async fn main() { transaction_mode, default_server_role, server_info, + reporter, ) .await { @@ -170,7 +192,10 @@ async fn main() { // Setup shut down sequence match signal::ctrl_c().await { - Ok(()) => {} + Ok(()) => { + println!("> Shutting down..."); + } + Err(err) => { eprintln!("Unable to listen for shutdown signal: {}", err); } diff --git a/src/pool.rs b/src/pool.rs index a4f4bb3..62710eb 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -7,6 +7,7 @@ use chrono::naive::NaiveDateTime; use crate::config::{Address, Config, Role, User}; use crate::errors::Error; use crate::server::Server; +use crate::stats::Reporter; use std::collections::HashMap; use std::sync::{ @@ -14,6 +15,7 @@ use std::sync::{ Arc, Mutex, }; +use std::time::Instant; // Banlist: bad servers go in here. pub type BanList = Arc>>>; @@ -29,11 +31,16 @@ pub struct ConnectionPool { healthcheck_timeout: u64, ban_time: i64, pool_size: u32, + stats: Reporter, } impl ConnectionPool { /// Construct the connection pool from a config file. - pub async fn from_config(config: Config, client_server_map: ClientServerMap) -> ConnectionPool { + pub async fn from_config( + config: Config, + client_server_map: ClientServerMap, + stats: Reporter, + ) -> ConnectionPool { let mut shards = Vec::new(); let mut addresses = Vec::new(); let mut banlist = Vec::new(); @@ -71,6 +78,7 @@ impl ConnectionPool { config.user.clone(), &shard.database, client_server_map.clone(), + stats.clone(), ); let pool = Pool::builder() @@ -103,6 +111,7 @@ impl ConnectionPool { healthcheck_timeout: config.general.healthcheck_timeout, ban_time: config.general.ban_time, pool_size: config.general.pool_size, + stats: stats, } } @@ -149,6 +158,7 @@ impl ConnectionPool { ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { // Set this to false to gain ~3-4% speed. let with_health_check = true; + let now = Instant::now(); let shard = match shard { Some(shard) => shard, @@ -227,6 +237,7 @@ impl ConnectionPool { }; if !with_health_check { + self.stats.checkout_time(now.elapsed().as_millis()); return Ok((conn, address.clone())); } @@ -241,7 +252,10 @@ impl ConnectionPool { { // Check if health check succeeded Ok(res) => match res { - Ok(_) => return Ok((conn, address.clone())), + Ok(_) => { + self.stats.checkout_time(now.elapsed().as_millis()); + return Ok((conn, address.clone())); + } Err(_) => { println!( ">> Banning replica {} because of failed health check", @@ -340,6 +354,7 @@ pub struct ServerPool { user: User, database: String, client_server_map: ClientServerMap, + stats: Reporter, } impl ServerPool { @@ -348,12 +363,14 @@ impl ServerPool { user: User, database: &str, client_server_map: ClientServerMap, + stats: Reporter, ) -> ServerPool { ServerPool { address: address, user: user, database: database.to_string(), client_server_map: client_server_map, + stats: stats, } } } @@ -375,6 +392,7 @@ impl ManageConnection for ServerPool { &self.database, self.client_server_map.clone(), self.address.role, + self.stats.clone(), ) .await } diff --git a/src/server.rs b/src/server.rs index ef3c611..ce55d9c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,6 +11,7 @@ use tokio::net::TcpStream; use crate::config::{Address, Role}; use crate::errors::Error; use crate::messages::*; +use crate::stats::Reporter; use crate::ClientServerMap; /// Server state. @@ -54,6 +55,9 @@ pub struct Server { // Server connected at connected_at: chrono::naive::NaiveDateTime, + + // Stats + stats: Reporter, } impl Server { @@ -67,6 +71,7 @@ impl Server { database: &str, client_server_map: ClientServerMap, role: Role, + stats: Reporter, ) -> Result { let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await { Ok(stream) => stream, @@ -198,6 +203,7 @@ impl Server { client_server_map: client_server_map, role: role, connected_at: chrono::offset::Utc::now().naive_utc(), + stats: stats, }); } @@ -236,6 +242,8 @@ impl Server { /// Send data to the server from the client. pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { + self.stats.data_sent(messages.len()); + match write_all_half(&mut self.write, messages).await { Ok(_) => Ok(()), Err(err) => { @@ -280,8 +288,6 @@ impl Server { self.in_transaction = false; } - // Error client didn't clean up! - // We shuold drop this server 'E' => { self.in_transaction = true; } @@ -332,6 +338,7 @@ impl Server { } let bytes = self.buffer.clone(); + self.stats.data_received(bytes.len()); self.buffer.clear(); Ok(bytes) diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 0000000..a7eecec --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,163 @@ +use statsd::Client; +/// Statistics collector and publisher. +use tokio::sync::mpsc::{Receiver, Sender}; + +use std::collections::HashMap; +use std::time::Instant; + +#[derive(Debug)] +pub enum StatisticName { + CheckoutTime, + //QueryRuntime, + //TransactionTime, + Queries, + Transactions, + DataSent, + DataReceived, +} + +#[derive(Debug)] +pub struct Statistic { + pub name: StatisticName, + pub value: i64, +} + +#[derive(Clone, Debug)] +pub struct Reporter { + tx: Sender, +} + +impl Reporter { + pub fn new(tx: Sender) -> Reporter { + Reporter { tx: tx } + } + + pub fn query(&mut self) { + let statistic = Statistic { + name: StatisticName::Queries, + value: 1, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn transaction(&mut self) { + let statistic = Statistic { + name: StatisticName::Transactions, + value: 1, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn data_sent(&mut self, amount: usize) { + let statistic = Statistic { + name: StatisticName::DataSent, + value: amount as i64, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn data_received(&mut self, amount: usize) { + let statistic = Statistic { + name: StatisticName::DataReceived, + value: amount as i64, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn checkout_time(&mut self, ms: u128) { + let statistic = Statistic { + name: StatisticName::CheckoutTime, + value: ms as i64, + }; + + let _ = self.tx.try_send(statistic); + } +} + +pub struct Collector { + rx: Receiver, + client: Client, +} + +impl Collector { + pub fn new(rx: Receiver) -> Collector { + Collector { + rx: rx, + client: Client::new("127.0.0.1:8125", "pgcat").unwrap(), + } + } + + pub async fn collect(&mut self) { + let mut stats = HashMap::from([ + ("queries", 0), + ("transactions", 0), + ("data_sent", 0), + ("data_received", 0), + ("checkout_time", 0), + ]); + let mut now = Instant::now(); + + loop { + let stat = match self.rx.recv().await { + Some(stat) => stat, + None => { + println!(">> Statistics collector is shutting down."); + return; + } + }; + + // Some are counters, some are gauges... + match stat.name { + StatisticName::Queries => { + let counter = stats.entry("queries").or_insert(0); + *counter += stat.value; + } + + StatisticName::Transactions => { + let counter = stats.entry("transactions").or_insert(0); + *counter += stat.value; + } + + StatisticName::DataSent => { + let counter = stats.entry("data_sent").or_insert(0); + *counter += stat.value; + } + + StatisticName::DataReceived => { + let counter = stats.entry("data_received").or_insert(0); + *counter += stat.value; + } + + StatisticName::CheckoutTime => { + let counter = stats.entry("checkout_time").or_insert(0); + + // Report max time here + if stat.value > *counter { + *counter = stat.value; + } + } + }; + + // It's been 15 seconds. If there is no traffic, it won't publish anything, + // but it also doesn't matter then. + if now.elapsed().as_secs() > 15 { + let mut pipeline = self.client.pipeline(); + + println!(">> Publishing statistics to StatsD: {:?}", stats); + + for (key, value) in stats.iter_mut() { + pipeline.gauge(key, *value as f64); + *value = 0; + } + + pipeline.send(&self.client); + + now = Instant::now(); + } + } + } +}