This commit is contained in:
Lev Kokotov
2022-02-08 17:08:17 -08:00
parent 95c2d593cc
commit edf6a69ca4
7 changed files with 22 additions and 70 deletions

View File

@@ -2,7 +2,6 @@
/// We are pretending to the server in this scenario, /// We are pretending to the server in this scenario,
/// and this module implements that. /// and this module implements that.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use rand::{distributions::Alphanumeric, Rng};
use regex::Regex; use regex::Regex;
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};

View File

@@ -1,10 +1,10 @@
use serde_derive::Deserialize; use serde_derive::Deserialize;
use std::collections::HashMap;
use std::path::Path;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use toml; use toml;
use std::collections::HashMap;
use crate::errors::Error; use crate::errors::Error;
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
@@ -62,16 +62,6 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
} }
}; };
// let config: toml::Value = match toml::from_str(&contents) {
// Ok(config) => config,
// Err(err) => {
// println!("> Config error: {:?}", err);
// return Err(Error::BadConfig);
// }
// };
// println!("Config: {:?}", config);
let config: Config = match toml::from_str(&contents) { let config: Config = match toml::from_str(&contents) {
Ok(config) => config, Ok(config) => config,
Err(err) => { Err(err) => {
@@ -82,3 +72,16 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
Ok(config) Ok(config)
} }
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_config() {
let config = parse("pgcat.toml").await.unwrap();
assert_eq!(config.general.pool_size, 15);
assert_eq!(config.shards.len(), 3);
assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1");
}
}

View File

@@ -1,11 +1,11 @@
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
SocketError, SocketError,
ClientDisconnected, // ClientDisconnected,
ClientBadStartup, ClientBadStartup,
ProtocolSyncError, ProtocolSyncError,
ServerError, ServerError,
ServerTimeout, // ServerTimeout,
DirtyServer, // DirtyServer,
BadConfig, BadConfig,
} }

View File

@@ -37,7 +37,6 @@ mod sharding;
// Support for query cancellation: this maps our process_ids and // Support for query cancellation: this maps our process_ids and
// secret keys to the backend's. // secret keys to the backend's.
use config::{Address, User};
use pool::{ClientServerMap, ConnectionPool}; use pool::{ClientServerMap, ConnectionPool};
/// Main! /// Main!
@@ -48,6 +47,7 @@ async fn main() {
let config = match config::parse("pgcat.toml").await { let config = match config::parse("pgcat.toml").await {
Ok(config) => config, Ok(config) => config,
Err(err) => { Err(err) => {
println!("> Config parse error: {:?}", err);
return; return;
} }
}; };

View File

@@ -18,19 +18,6 @@ pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>;
pub type Counter = Arc<AtomicUsize>; pub type Counter = Arc<AtomicUsize>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>; pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
// 60 seconds of ban time.
// After that, the replica will be allowed to serve traffic again.
const BAN_TIME: i64 = 60;
// DB pool size (per actual database server)
const POOL_SIZE: u32 = 15;
// 5 seconds to connect before we give up
const CONNECT_TIMEOUT: u64 = 5000;
// How much time to give the server to answer a SELECT 1 query.
const HEALTHCHECK_TIMEOUT: u64 = 1000;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ConnectionPool { pub struct ConnectionPool {
databases: Vec<Vec<Pool<ServerPool>>>, databases: Vec<Vec<Pool<ServerPool>>>,
@@ -42,43 +29,6 @@ pub struct ConnectionPool {
} }
impl ConnectionPool { impl ConnectionPool {
// Construct the connection pool for a single-shard cluster.
pub async fn new(
addresses: Vec<Address>,
user: User,
database: &str,
client_server_map: ClientServerMap,
) -> ConnectionPool {
let mut databases = Vec::new();
for address in &addresses {
let manager = ServerPool::new(
address.clone(),
user.clone(),
database,
client_server_map.clone(),
);
let pool = Pool::builder()
.max_size(POOL_SIZE)
.connection_timeout(std::time::Duration::from_millis(CONNECT_TIMEOUT))
.test_on_check_out(false)
.build(manager)
.await
.unwrap();
databases.push(pool);
}
ConnectionPool {
databases: vec![databases],
addresses: vec![addresses],
round_robin: Arc::new(AtomicUsize::new(0)),
banlist: Arc::new(Mutex::new(vec![HashMap::new()])),
healthcheck_timeout: HEALTHCHECK_TIMEOUT,
ban_time: BAN_TIME,
}
}
/// Construct the connection pool from a config file. /// Construct the connection pool from a config file.
pub async fn from_config(config: Config, client_server_map: ClientServerMap) -> ConnectionPool { pub async fn from_config(config: Config, client_server_map: ClientServerMap) -> ConnectionPool {
let mut shards = Vec::new(); let mut shards = Vec::new();
@@ -222,7 +172,7 @@ impl ConnectionPool {
/// Clear the replica to receive traffic again. Takes effect immediately /// Clear the replica to receive traffic again. Takes effect immediately
/// for all new transactions. /// for all new transactions.
pub fn unban(&self, address: &Address, shard: usize) { pub fn _unban(&self, address: &Address, shard: usize) {
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.lock().unwrap();
guard[shard].remove(address); guard[shard].remove(address);
} }

View File

@@ -14,7 +14,7 @@ impl Sharder {
/// Use SHA1 to pick a shard for the key. The key can be anything, /// Use SHA1 to pick a shard for the key. The key can be anything,
/// including an int or a string. /// including an int or a string.
pub fn sha1(&self, key: &[u8]) -> usize { pub fn _sha1(&self, key: &[u8]) -> usize {
let mut hasher = Sha1::new(); let mut hasher = Sha1::new();
hasher.update(key); hasher.update(key);
let result = hasher.finalize_reset(); let result = hasher.finalize_reset();