diff --git a/src/client.rs b/src/client.rs index cbb9872..dd05786 100644 --- a/src/client.rs +++ b/src/client.rs @@ -57,6 +57,7 @@ pub struct Client { default_server_role: Option, // Client parameters, e.g. user, client_encoding, etc. + #[allow(dead_code)] parameters: HashMap, // Statistics @@ -302,8 +303,11 @@ impl Client { // Release server if self.transaction_mode { + self.stats.client_idle(); + shard = None; role = self.default_server_role; + break; } } @@ -371,8 +375,11 @@ impl Client { self.stats.transaction(); if self.transaction_mode { + self.stats.client_idle(); + shard = None; role = self.default_server_role; + break; } } diff --git a/src/config.rs b/src/config.rs index d2e050b..1a3f22b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,6 +17,7 @@ pub enum Role { pub struct Address { pub host: String, pub port: String, + pub shard: usize, pub role: Role, } diff --git a/src/pool.rs b/src/pool.rs index 62710eb..3cf0c4e 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -57,7 +57,7 @@ impl ConnectionPool { let mut pools = Vec::new(); let mut replica_addresses = Vec::new(); - for server in &shard.servers { + for (idx, server) in shard.servers.iter().enumerate() { let role = match server.2.as_ref() { "primary" => Role::Primary, "replica" => Role::Replica, @@ -71,6 +71,7 @@ impl ConnectionPool { host: server.0.clone(), port: server.1.to_string(), role: role, + shard: idx, }; let manager = ServerPool::new( @@ -165,6 +166,9 @@ impl ConnectionPool { None => 0, // TODO: pick a shard at random }; + // We are waiting for a server now. + self.stats.client_waiting(); + let addresses = &self.addresses[shard]; // Make sure if a specific role is requested, it's available in the pool. @@ -237,7 +241,8 @@ impl ConnectionPool { }; if !with_health_check { - self.stats.checkout_time(now.elapsed().as_millis()); + self.stats.checkout_time(now.elapsed().as_micros()); + self.stats.client_active(); return Ok((conn, address.clone())); } @@ -253,7 +258,8 @@ impl ConnectionPool { // Check if health check succeeded Ok(res) => match res { Ok(_) => { - self.stats.checkout_time(now.elapsed().as_millis()); + self.stats.checkout_time(now.elapsed().as_micros()); + self.stats.client_active(); return Ok((conn, address.clone())); } Err(_) => { @@ -385,13 +391,10 @@ impl ManageConnection for ServerPool { println!(">> Creating a new connection for the pool"); Server::startup( - &self.address.host, - &self.address.port, - &self.user.name, - &self.user.password, + &self.address, + &self.user, &self.database, self.client_server_map.clone(), - self.address.role, self.stats.clone(), ) .await diff --git a/src/server.rs b/src/server.rs index ce55d9c..c1cacfc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,7 +8,7 @@ use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; -use crate::config::{Address, Role}; +use crate::config::{Address, User}; use crate::errors::Error; use crate::messages::*; use crate::stats::Reporter; @@ -16,11 +16,9 @@ use crate::ClientServerMap; /// Server state. pub struct Server { - // Server host, e.g. localhost - host: String, - - // Server port: e.g. 5432 - port: String, + // Server host, e.g. localhost, + // port, e.g. 5432, and role, e.g. primary or replica. + address: Address, // Buffered read socket read: BufReader, @@ -50,9 +48,6 @@ pub struct Server { // Mapping of clients and servers used for query cancellation. client_server_map: ClientServerMap, - // Server role, e.g. primary or replica. - role: Role, - // Server connected at connected_at: chrono::naive::NaiveDateTime, @@ -64,25 +59,23 @@ impl Server { /// Pretend to be the Postgres client and connect to the server given host, port and credentials. /// Perform the authentication and return the server in a ready-for-query mode. pub async fn startup( - host: &str, - port: &str, - user: &str, - password: &str, + address: &Address, + user: &User, database: &str, client_server_map: ClientServerMap, - role: Role, stats: Reporter, ) -> Result { - let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await { - Ok(stream) => stream, - Err(err) => { - println!(">> Could not connect to server: {}", err); - return Err(Error::SocketError); - } - }; + let mut stream = + match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await { + Ok(stream) => stream, + Err(err) => { + println!(">> Could not connect to server: {}", err); + return Err(Error::SocketError); + } + }; // Send the startup packet. - startup(&mut stream, user, database).await?; + startup(&mut stream, &user.name, database).await?; let mut server_info = BytesMut::with_capacity(25); let mut backend_id: i32 = 0; @@ -117,7 +110,8 @@ impl Server { Err(_) => return Err(Error::SocketError), }; - md5_password(&mut stream, user, password, &salt[..]).await?; + md5_password(&mut stream, &user.name, &user.password, &salt[..]) + .await?; } // Authentication handshake complete. @@ -189,8 +183,7 @@ impl Server { let (read, write) = stream.into_split(); return Ok(Server { - host: host.to_string(), - port: port.to_string(), + address: address.clone(), read: BufReader::new(read), write: write, buffer: BytesMut::with_capacity(8196), @@ -201,7 +194,6 @@ impl Server { data_available: false, bad: false, client_server_map: client_server_map, - role: role, connected_at: chrono::offset::Utc::now().naive_utc(), stats: stats, }); @@ -382,8 +374,8 @@ impl Server { ( self.backend_id, self.secret_key, - self.host.clone(), - self.port.clone(), + self.address.host.clone(), + self.address.port.clone(), ), ); } @@ -422,11 +414,7 @@ impl Server { } pub fn address(&self) -> Address { - Address { - host: self.host.to_string(), - port: self.port.to_string(), - role: self.role, - } + self.address.clone() } } diff --git a/src/stats.rs b/src/stats.rs index a7eecec..3f93b3f 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -14,6 +14,9 @@ pub enum StatisticName { Transactions, DataSent, DataReceived, + ClientsWaiting, + ClientsActive, + ClientsIdle, } #[derive(Debug)] @@ -76,6 +79,54 @@ impl Reporter { let _ = self.tx.try_send(statistic); } + + pub fn client_waiting(&mut self) { + let statistic = Statistic { + name: StatisticName::ClientsWaiting, + value: 1, + }; + + let _ = self.tx.try_send(statistic); + + let statistic = Statistic { + name: StatisticName::ClientsIdle, + value: -1, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn client_active(&mut self) { + let statistic = Statistic { + name: StatisticName::ClientsWaiting, + value: -1, + }; + + let _ = self.tx.try_send(statistic); + + let statistic = Statistic { + name: StatisticName::ClientsActive, + value: 1, + }; + + let _ = self.tx.try_send(statistic); + } + + pub fn client_idle(&mut self) { + let statistic = Statistic { + name: StatisticName::ClientsActive, + value: -1, + }; + + let _ = self.tx.try_send(statistic); + + let statistic = Statistic { + name: StatisticName::ClientsIdle, + value: 1, + }; + + let _ = self.tx.try_send(statistic); + } } pub struct Collector { @@ -93,12 +144,18 @@ impl Collector { pub async fn collect(&mut self) { let mut stats = HashMap::from([ - ("queries", 0), - ("transactions", 0), - ("data_sent", 0), - ("data_received", 0), - ("checkout_time", 0), + ("total_query_count", 0), + ("total_xact_count", 0), + ("total_sent", 0), + ("total_received", 0), + ("total_wait_time", 0), + ("maxwait_us", 0), + ("maxwait", 0), + ("cl_waiting", 0), + ("cl_active", 0), + ("cl_idle", 0), ]); + let mut now = Instant::now(); loop { @@ -113,32 +170,61 @@ impl Collector { // Some are counters, some are gauges... match stat.name { StatisticName::Queries => { - let counter = stats.entry("queries").or_insert(0); + let counter = stats.entry("total_query_count").or_insert(0); *counter += stat.value; } StatisticName::Transactions => { - let counter = stats.entry("transactions").or_insert(0); + let counter = stats.entry("total_xact_count").or_insert(0); *counter += stat.value; } StatisticName::DataSent => { - let counter = stats.entry("data_sent").or_insert(0); + let counter = stats.entry("total_sent").or_insert(0); *counter += stat.value; } StatisticName::DataReceived => { - let counter = stats.entry("data_received").or_insert(0); + let counter = stats.entry("total_received").or_insert(0); *counter += stat.value; } StatisticName::CheckoutTime => { - let counter = stats.entry("checkout_time").or_insert(0); + let counter = stats.entry("total_wait_time").or_insert(0); + *counter += stat.value; + + let counter = stats.entry("maxwait_us").or_insert(0); // Report max time here if stat.value > *counter { *counter = stat.value; } + + let counter = stats.entry("maxwait").or_insert(0); + let seconds = *counter / 1_000_000; + + if seconds > *counter { + *counter = seconds; + } + } + + StatisticName::ClientsActive => { + let counter = stats.entry("cl_active").or_insert(0); + + *counter += stat.value; + *counter = std::cmp::max(*counter, 0); + } + + StatisticName::ClientsWaiting => { + let counter = stats.entry("cl_waiting").or_insert(0); + *counter += stat.value; + *counter = std::cmp::max(*counter, 0); + } + + StatisticName::ClientsIdle => { + let counter = stats.entry("cl_idle").or_insert(0); + *counter += stat.value; + *counter = std::cmp::max(*counter, 0); } };