From edf6a69ca46212a6b6b06d9d2ada44e1c44a2a07 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 8 Feb 2022 17:08:17 -0800 Subject: [PATCH] warnings --- Cargo.toml | 2 +- src/client.rs | 1 - src/config.rs | 27 +++++++++++++------------ src/errors.rs | 6 +++--- src/main.rs | 2 +- src/pool.rs | 52 +------------------------------------------------ src/sharding.rs | 2 +- 7 files changed, 22 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 019c153..d866198 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,4 @@ sha-1 = "*" toml = "*" serde = "*" serde_derive = "*" -regex = "1" \ No newline at end of file +regex = "1" diff --git a/src/client.rs b/src/client.rs index 0ba97d0..31b61ff 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,6 @@ /// We are pretending to the server in this scenario, /// and this module implements that. use bytes::{Buf, BufMut, BytesMut}; -use rand::{distributions::Alphanumeric, Rng}; use regex::Regex; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; diff --git a/src/config.rs b/src/config.rs index bfc5619..ed897b2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,10 @@ use serde_derive::Deserialize; -use std::collections::HashMap; -use std::path::Path; use tokio::fs::File; use tokio::io::AsyncReadExt; use toml; +use std::collections::HashMap; + use crate::errors::Error; #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] @@ -62,16 +62,6 @@ pub async fn parse(path: &str) -> Result { } }; - // let config: toml::Value = match toml::from_str(&contents) { - // Ok(config) => config, - // Err(err) => { - // println!("> Config error: {:?}", err); - // return Err(Error::BadConfig); - // } - // }; - - // println!("Config: {:?}", config); - let config: Config = match toml::from_str(&contents) { Ok(config) => config, Err(err) => { @@ -82,3 +72,16 @@ pub async fn parse(path: &str) -> Result { Ok(config) } + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_config() { + let config = parse("pgcat.toml").await.unwrap(); + assert_eq!(config.general.pool_size, 15); + assert_eq!(config.shards.len(), 3); + assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1"); + } +} diff --git a/src/errors.rs b/src/errors.rs index 5ee43e3..3dcbf74 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,11 +1,11 @@ #[derive(Debug, PartialEq)] pub enum Error { SocketError, - ClientDisconnected, + // ClientDisconnected, ClientBadStartup, ProtocolSyncError, ServerError, - ServerTimeout, - DirtyServer, + // ServerTimeout, + // DirtyServer, BadConfig, } diff --git a/src/main.rs b/src/main.rs index 0602419..9b084ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,6 @@ mod sharding; // Support for query cancellation: this maps our process_ids and // secret keys to the backend's. -use config::{Address, User}; use pool::{ClientServerMap, ConnectionPool}; /// Main! @@ -48,6 +47,7 @@ async fn main() { let config = match config::parse("pgcat.toml").await { Ok(config) => config, Err(err) => { + println!("> Config parse error: {:?}", err); return; } }; diff --git a/src/pool.rs b/src/pool.rs index ed267ed..eb06816 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -18,19 +18,6 @@ pub type BanList = Arc>>>; pub type Counter = Arc; pub type ClientServerMap = Arc>>; -// 60 seconds of ban time. -// After that, the replica will be allowed to serve traffic again. -const BAN_TIME: i64 = 60; - -// DB pool size (per actual database server) -const POOL_SIZE: u32 = 15; - -// 5 seconds to connect before we give up -const CONNECT_TIMEOUT: u64 = 5000; - -// How much time to give the server to answer a SELECT 1 query. -const HEALTHCHECK_TIMEOUT: u64 = 1000; - #[derive(Clone, Debug)] pub struct ConnectionPool { databases: Vec>>, @@ -42,43 +29,6 @@ pub struct ConnectionPool { } impl ConnectionPool { - // Construct the connection pool for a single-shard cluster. - pub async fn new( - addresses: Vec
, - user: User, - database: &str, - client_server_map: ClientServerMap, - ) -> ConnectionPool { - let mut databases = Vec::new(); - - for address in &addresses { - let manager = ServerPool::new( - address.clone(), - user.clone(), - database, - client_server_map.clone(), - ); - let pool = Pool::builder() - .max_size(POOL_SIZE) - .connection_timeout(std::time::Duration::from_millis(CONNECT_TIMEOUT)) - .test_on_check_out(false) - .build(manager) - .await - .unwrap(); - - databases.push(pool); - } - - ConnectionPool { - databases: vec![databases], - addresses: vec![addresses], - round_robin: Arc::new(AtomicUsize::new(0)), - banlist: Arc::new(Mutex::new(vec![HashMap::new()])), - healthcheck_timeout: HEALTHCHECK_TIMEOUT, - ban_time: BAN_TIME, - } - } - /// Construct the connection pool from a config file. pub async fn from_config(config: Config, client_server_map: ClientServerMap) -> ConnectionPool { let mut shards = Vec::new(); @@ -222,7 +172,7 @@ impl ConnectionPool { /// Clear the replica to receive traffic again. Takes effect immediately /// for all new transactions. - pub fn unban(&self, address: &Address, shard: usize) { + pub fn _unban(&self, address: &Address, shard: usize) { let mut guard = self.banlist.lock().unwrap(); guard[shard].remove(address); } diff --git a/src/sharding.rs b/src/sharding.rs index b729234..3f32dbe 100644 --- a/src/sharding.rs +++ b/src/sharding.rs @@ -14,7 +14,7 @@ impl Sharder { /// Use SHA1 to pick a shard for the key. The key can be anything, /// including an int or a string. - pub fn sha1(&self, key: &[u8]) -> usize { + pub fn _sha1(&self, key: &[u8]) -> usize { let mut hasher = Sha1::new(); hasher.update(key); let result = hasher.finalize_reset();