From 20ceb729a0372de081aca497f13e86803b4034b4 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sat, 12 Feb 2022 09:24:24 -0800 Subject: [PATCH] print session duration; connect to all servers when validating (#11) --- src/main.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++++++----- src/pool.rs | 31 +++++++++++++++++++------------ 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index 37f820f..58e6778 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,10 +123,9 @@ async fn main() { // Client goes to another thread, bye. tokio::task::spawn(async move { - println!( - ">> Client {:?} connected, transaction pooling: {}", - addr, transaction_mode - ); + let start = chrono::offset::Utc::now().naive_utc(); + + println!(">> Client {:?} connected", addr); match client::Client::startup( socket, @@ -142,7 +141,13 @@ async fn main() { match client.handle(pool).await { Ok(()) => { - println!(">> Client {:?} disconnected.", addr); + let duration = chrono::offset::Utc::now().naive_utc() - start; + + println!( + ">> Client {:?} disconnected, session duration: {}", + addr, + format_duration(&duration) + ); } Err(err) => { @@ -159,3 +164,41 @@ async fn main() { }); } } + +/// Format chrono::Duration to be more human-friendly. +/// +/// # Arguments +/// +/// * `duration` - A duration of time +fn format_duration(duration: &chrono::Duration) -> String { + let seconds = { + let seconds = duration.num_seconds() % 60; + if seconds < 10 { + format!("0{}", seconds) + } else { + format!("{}", seconds) + } + }; + + let minutes = { + let minutes = duration.num_minutes() % 60; + if minutes < 10 { + format!("0{}", minutes) + } else { + format!("{}", minutes) + } + }; + + let hours = { + let hours = duration.num_hours() % 24; + if hours < 10 { + format!("0{}", hours) + } else { + format!("{}", hours) + } + }; + + let days = duration.num_days().to_string(); + + format!("{}d {}:{}:{}", days, hours, minutes, seconds) +} diff --git a/src/pool.rs b/src/pool.rs index 3432330..a4f4bb3 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -109,24 +109,27 @@ impl ConnectionPool { /// Connect to all shards and grab server information. /// Return server information we will pass to the clients /// when they connect. + /// This also warms up the pool for clients that connect when + /// the pooler starts up. pub async fn validate(&mut self) -> Result { let mut server_infos = Vec::new(); for shard in 0..self.shards() { - // TODO: query all primary and replicas in the shard configuration. - let connection = match self.get(Some(shard), None).await { - Ok(conn) => conn, - Err(err) => { - println!("> Shard {} down or misconfigured.", shard); - return Err(err); - } - }; + for _ in 0..self.replicas(shard) { + let connection = match self.get(Some(shard), None).await { + Ok(conn) => conn, + Err(err) => { + println!("> Shard {} down or misconfigured.", shard); + return Err(err); + } + }; - let mut proxy = connection.0; - let _address = connection.1; - let server = &mut *proxy; + let mut proxy = connection.0; + let _address = connection.1; + let server = &mut *proxy; - server_infos.push(server.server_info()); + server_infos.push(server.server_info()); + } } // TODO: compare server information to make sure @@ -326,6 +329,10 @@ impl ConnectionPool { pub fn shards(&self) -> usize { self.databases.len() } + + pub fn replicas(&self, shard: usize) -> usize { + self.addresses[shard].len() + } } pub struct ServerPool {