From 3a729bb75b53454edcdc8ecb9dbabf994ec4ab04 Mon Sep 17 00:00:00 2001 From: zainkabani <77307340+zainkabani@users.noreply.github.com> Date: Thu, 22 Sep 2022 13:07:02 -0400 Subject: [PATCH] Minor refactor for configs (#172) * Changes shard struct to use vector of ServerConfig * Adds to query router * Change client disconnect with error message to warn instead of debug * Add warning logs for clean up actions --- src/config.rs | 44 ++++++++++++++++++++++---------------------- src/main.rs | 4 ++-- src/pool.rs | 17 ++++------------- src/query_router.rs | 4 ++-- src/server.rs | 2 ++ 5 files changed, 32 insertions(+), 39 deletions(-) diff --git a/src/config.rs b/src/config.rs index 9ef4d17..e4c25a2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,7 +23,9 @@ static CONFIG: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Config /// Server role: primary or replica. #[derive(Clone, PartialEq, Serialize, Deserialize, Hash, std::cmp::Eq, Debug, Copy)] pub enum Role { + #[serde(alias = "primary", alias = "Primary")] Primary, + #[serde(alias = "replica", alias = "Replica")] Replica, } @@ -202,17 +204,28 @@ impl Default for Pool { } } +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)] +pub struct ServerConfig { + pub host: String, + pub port: u16, + pub role: Role, +} + /// Shard configuration. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Shard { pub database: String, - pub servers: Vec<(String, u16, String)>, + pub servers: Vec, } impl Default for Shard { fn default() -> Shard { Shard { - servers: vec![(String::from("localhost"), 5432, String::from("primary"))], + servers: vec![ServerConfig { + host: String::from("localhost"), + port: 5432, + role: Role::Primary, + }], database: String::from("postgres"), } } @@ -538,23 +551,10 @@ pub async fn parse(path: &str) -> Result<(), Error> { dup_check.insert(server); // Check that we define only zero or one primary. - match server.2.as_ref() { - "primary" => primary_count += 1, + match server.role { + Role::Primary => primary_count += 1, _ => (), }; - - // Check role spelling. - match server.2.as_ref() { - "primary" => (), - "replica" => (), - _ => { - error!( - "Shard {} server role must be either 'primary' or 'replica', got: '{}'", - shard.0, server.2 - ); - return Err(Error::BadConfig); - } - }; } if primary_count > 1 { @@ -617,12 +617,12 @@ mod test { assert_eq!(get_config().pools["simple_db"].users.len(), 1); assert_eq!( - get_config().pools["sharded_db"].shards["0"].servers[0].0, + get_config().pools["sharded_db"].shards["0"].servers[0].host, "127.0.0.1" ); assert_eq!( - get_config().pools["sharded_db"].shards["1"].servers[0].2, - "primary" + get_config().pools["sharded_db"].shards["1"].servers[0].role, + Role::Primary ); assert_eq!( get_config().pools["sharded_db"].shards["1"].database, @@ -640,11 +640,11 @@ mod test { assert_eq!(get_config().pools["sharded_db"].default_role, "any"); assert_eq!( - get_config().pools["simple_db"].shards["0"].servers[0].0, + get_config().pools["simple_db"].shards["0"].servers[0].host, "127.0.0.1" ); assert_eq!( - get_config().pools["simple_db"].shards["0"].servers[0].1, + get_config().pools["simple_db"].shards["0"].servers[0].port, 5432 ); assert_eq!( diff --git a/src/main.rs b/src/main.rs index 6690b56..467527c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,7 @@ extern crate tokio; extern crate tokio_rustls; extern crate toml; -use log::{debug, error, info}; +use log::{error, info, warn}; use parking_lot::Mutex; use pgcat::format_duration; use tokio::net::TcpListener; @@ -279,7 +279,7 @@ async fn main() { } } - debug!("Client disconnected with error {:?}", err); + warn!("Client disconnected with error {:?}", err); } }; }); diff --git a/src/pool.rs b/src/pool.rs index c4a424e..f81a1e0 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -143,21 +143,12 @@ impl ConnectionPool { let mut replica_number = 0; for server in shard.servers.iter() { - let role = match server.2.as_ref() { - "primary" => Role::Primary, - "replica" => Role::Replica, - _ => { - error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2); - Role::Replica - } - }; - let address = Address { id: address_id, database: shard.database.clone(), - host: server.0.clone(), - port: server.1 as u16, - role: role, + host: server.host.clone(), + port: server.port, + role: server.role, address_index, replica_number, shard: shard_idx.parse::().unwrap(), @@ -168,7 +159,7 @@ impl ConnectionPool { address_id += 1; address_index += 1; - if role == Role::Replica { + if server.role == Role::Replica { replica_number += 1; } diff --git a/src/query_router.rs b/src/query_router.rs index f9d5f0b..85d4f8b 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -169,8 +169,8 @@ impl QueryRouter { Command::ShowShard => self.shard().to_string(), Command::ShowServerRole => match self.active_role { - Some(Role::Primary) => String::from("primary"), - Some(Role::Replica) => String::from("replica"), + Some(Role::Primary) => Role::Primary.to_string(), + Some(Role::Replica) => Role::Replica.to_string(), None => { if self.query_parser_enabled { String::from("auto") diff --git a/src/server.rs b/src/server.rs index 64886be..dbac9bc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -591,6 +591,7 @@ impl Server { // server connection thrashing if clients repeatedly do this. // Instead, we ROLLBACK that transaction before putting the connection back in the pool if self.in_transaction() { + warn!("Server returned while still in transaction, rolling back transaction"); self.query("ROLLBACK").await?; } @@ -600,6 +601,7 @@ impl Server { // send `DISCARD ALL` if we think the session is altered instead of just sending // it before each checkin. if self.needs_cleanup { + warn!("Server returned with session state altered, discarding state"); self.query("DISCARD ALL").await?; self.needs_cleanup = false; }