From 4aa9c3d3c76502729c83bad8113f5e14c4257277 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sat, 12 Feb 2022 10:16:05 -0800 Subject: [PATCH] Cleaner shutdown (#12) * Cleaner shutdown * mark as bad just in case although im pretty sure we dont need it * server session duration * test clean shutdown * ah --- .circleci/config.yml | 4 +- .circleci/run_tests.sh | 3 ++ src/main.rs | 114 +++++++++++++++++++++++------------------ src/server.rs | 29 +++++++++++ 4 files changed, 97 insertions(+), 53 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index bd5b396..5feb5c5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,7 +25,7 @@ jobs: key: cargo-lock-2-{{ checksum "Cargo.lock" }} - run: name: "Install dependencies" - command: "sudo apt-get update && sudo apt-get install -y postgresql-contrib-12 postgresql-client-12" + command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12" - run: name: "Build" command: "cargo build" @@ -47,4 +47,4 @@ jobs: workflows: build: jobs: - - build \ No newline at end of file + - build diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index c24a59b..fc44825 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -26,3 +26,6 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > / # Replica/primary selection & more sharding tests psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null + +# Attempt clean shut down +killall pgcat -s SIGINT diff --git a/src/main.rs b/src/main.rs index 58e6778..29b0a2c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ extern crate toml; use regex::Regex; use tokio::net::TcpListener; +use tokio::signal; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -108,61 +109,72 @@ async fn main() { println!("> Waiting for clients..."); - loop { - let pool = pool.clone(); - let client_server_map = client_server_map.clone(); - let server_info = server_info.clone(); - - let (socket, addr) = match listener.accept().await { - Ok((socket, addr)) => (socket, addr), - Err(err) => { - println!("> Listener: {:?}", err); - continue; - } - }; - - // Client goes to another thread, bye. - tokio::task::spawn(async move { - let start = chrono::offset::Utc::now().naive_utc(); - - println!(">> Client {:?} connected", addr); - - match client::Client::startup( - socket, - client_server_map, - transaction_mode, - default_server_role, - server_info, - ) - .await - { - Ok(mut client) => { - println!(">> Client {:?} authenticated successfully!", addr); - - match client.handle(pool).await { - Ok(()) => { - let duration = chrono::offset::Utc::now().naive_utc() - start; - - println!( - ">> Client {:?} disconnected, session duration: {}", - addr, - format_duration(&duration) - ); - } - - Err(err) => { - println!(">> Client disconnected with error: {:?}", err); - client.release(); - } - } - } + // Main app runs here. + tokio::task::spawn(async move { + loop { + let pool = pool.clone(); + let client_server_map = client_server_map.clone(); + let server_info = server_info.clone(); + let (socket, addr) = match listener.accept().await { + Ok((socket, addr)) => (socket, addr), Err(err) => { - println!(">> Error: {:?}", err); + println!("> Listener: {:?}", err); + continue; } }; - }); - } + + // Client goes to another thread, bye. + tokio::task::spawn(async move { + let start = chrono::offset::Utc::now().naive_utc(); + + println!(">> Client {:?} connected", addr); + + match client::Client::startup( + socket, + client_server_map, + transaction_mode, + default_server_role, + server_info, + ) + .await + { + Ok(mut client) => { + println!(">> Client {:?} authenticated successfully!", addr); + + match client.handle(pool).await { + Ok(()) => { + let duration = chrono::offset::Utc::now().naive_utc() - start; + + println!( + ">> Client {:?} disconnected, session duration: {}", + addr, + format_duration(&duration) + ); + } + + Err(err) => { + println!(">> Client disconnected with error: {:?}", err); + client.release(); + } + } + } + + Err(err) => { + println!(">> Error: {:?}", err); + } + }; + }); + } + }); + + // Setup shut down sequence + match signal::ctrl_c().await { + Ok(()) => {} + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + } + }; } /// Format chrono::Duration to be more human-friendly. diff --git a/src/server.rs b/src/server.rs index 13f362a..ef3c611 100644 --- a/src/server.rs +++ b/src/server.rs @@ -49,7 +49,11 @@ pub struct Server { // Mapping of clients and servers used for query cancellation. client_server_map: ClientServerMap, + // Server role, e.g. primary or replica. role: Role, + + // Server connected at + connected_at: chrono::naive::NaiveDateTime, } impl Server { @@ -193,6 +197,7 @@ impl Server { bad: false, client_server_map: client_server_map, role: role, + connected_at: chrono::offset::Utc::now().naive_utc(), }); } @@ -417,3 +422,27 @@ impl Server { } } } + +impl Drop for Server { + // Try to do a clean shut down. + fn drop(&mut self) { + let mut bytes = BytesMut::with_capacity(4); + bytes.put_u8(b'X'); + bytes.put_i32(4); + + match self.write.try_write(&bytes) { + Ok(n) => (), + Err(_) => (), + }; + + self.bad = true; + + let now = chrono::offset::Utc::now().naive_utc(); + let duration = now - self.connected_at; + + println!( + ">> Server connection closed, session duration: {}", + crate::format_duration(&duration) + ); + } +}