diff --git a/README.md b/README.md index 8624453..577c254 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,9 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su You can just PgBench to test your changes: ``` -pgbench -i -h 127.0.0.1 -p 5433 && \ -pgbench -t 1000 -p 5433 -h 127.0.0.1 --protocol simple && \ -pgbench -t 1000 -p 5433 -h 127.0.0.1 --protocol extended +pgbench -i -h 127.0.0.1 -p 6432 && \ +pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol simple && \ +pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended ``` ## Features @@ -75,7 +75,7 @@ to make this choice :-). You can setup PgBench locally through PgCat: ``` -pgbench -h 127.0.0.1 -p 5433 -i +pgbench -h 127.0.0.1 -p 6432 -i ``` Coincidenly, this uses `COPY` so you can test if that works. @@ -83,7 +83,25 @@ Coincidenly, this uses `COPY` so you can test if that works. ### PgBouncer ``` -pgbench -h 127.0.0.1 -p 6432 --protocol extended -t 1000 +$ pgbench -i -h 127.0.0.1 -p 6432 && pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol simple && pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended +dropping old tables... +creating tables... +generating data... +100000 of 100000 tuples (100%) done (elapsed 0.01 s, remaining 0.00 s) +vacuuming... +creating primary keys... +done. +starting vacuum...end. +transaction type: +scaling factor: 1 +query mode: simple +number of clients: 1 +number of threads: 1 +number of transactions per client: 1000 +number of transactions actually processed: 1000/1000 +latency average = 1.089 ms +tps = 918.687098 (including connections establishing) +tps = 918.847790 (excluding connections establishing) starting vacuum...end. transaction type: scaling factor: 1 @@ -92,15 +110,34 @@ number of clients: 1 number of threads: 1 number of transactions per client: 1000 number of transactions actually processed: 1000/1000 -latency average = 1.116 ms -tps = 895.900600 (including connections establishing) -tps = 896.115205 (excluding connections establishing) +latency average = 1.136 ms +tps = 880.622009 (including connections establishing) +tps = 880.769550 (excluding connections establishing) ``` ### PgCat + ``` -pgbench -h 127.0.0.1 -p 5433 --protocol extended -t 1000 +$ pgbench -i -h 127.0.0.1 -p 6432 && pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol simple && pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended +dropping old tables... +creating tables... +generating data... +100000 of 100000 tuples (100%) done (elapsed 0.01 s, remaining 0.00 s) +vacuuming... +creating primary keys... +done. +starting vacuum...end. +transaction type: +scaling factor: 1 +query mode: simple +number of clients: 1 +number of threads: 1 +number of transactions per client: 1000 +number of transactions actually processed: 1000/1000 +latency average = 1.142 ms +tps = 875.645437 (including connections establishing) +tps = 875.799995 (excluding connections establishing) starting vacuum...end. transaction type: scaling factor: 1 @@ -109,15 +146,36 @@ number of clients: 1 number of threads: 1 number of transactions per client: 1000 number of transactions actually processed: 1000/1000 -latency average = 1.152 ms -tps = 867.761579 (including connections establishing) -tps = 867.881391 (excluding connections establishing) +latency average = 1.181 ms +tps = 846.539176 (including connections establishing) +tps = 846.713636 (excluding connections establishing) ``` ### Direct Postgres ``` -pgbench -h 127.0.0.1 -p 5432 --protocol extended -t 1000 +$ pgbench -i -h 127.0.0.1 -p 5432 && pgbench -t 1000 -p 5432 -h 127.0.0.1 --protocol simple && pgbench -t 1000 -p +5432 -h 127.0.0.1 --protocol extended +Password: +dropping old tables... +creating tables... +generating data... +100000 of 100000 tuples (100%) done (elapsed 0.01 s, remaining 0.00 s) +vacuuming... +creating primary keys... +done. +Password: +starting vacuum...end. +transaction type: +scaling factor: 1 +query mode: simple +number of clients: 1 +number of threads: 1 +number of transactions per client: 1000 +number of transactions actually processed: 1000/1000 +latency average = 0.902 ms +tps = 1109.014867 (including connections establishing) +tps = 1112.318595 (excluding connections establishing) Password: starting vacuum...end. transaction type: @@ -127,7 +185,7 @@ number of clients: 1 number of threads: 1 number of transactions per client: 1000 number of transactions actually processed: 1000/1000 -latency average = 0.944 ms -tps = 1059.007346 (including connections establishing) -tps = 1061.700877 (excluding connections establishing) +latency average = 0.931 ms +tps = 1074.017747 (including connections establishing) +tps = 1077.121752 (excluding connections establishing) ``` \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index ba6f76b..dcab2b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,7 +63,7 @@ async fn main() { }, Address { host: "localhost".to_string(), - port: "5432".to_string(), + port: "5433".to_string(), }, ]; diff --git a/src/pool.rs b/src/pool.rs index e4097b9..5515267 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -14,18 +14,23 @@ use std::sync::{ }; // Banlist: bad servers go in here. -pub type BanList = Arc>>; +pub type BanList = Arc>>>; pub type Counter = Arc; pub type ClientServerMap = Arc>>; // 60 seconds of ban time. // After that, the replica will be allowed to serve traffic again. const BAN_TIME: i64 = 60; -// -// Poor man's config -// + +// DB pool size (per actual database server) const POOL_SIZE: u32 = 15; +// 5 seconds to connect before we give up +const CONNECT_TIMEOUT: u64 = 5000; + +// How much time to give the server to answer a SELECT 1 query. +const HEALTHCHECK_TIMEOUT: u64 = 1000; + #[derive(Clone)] pub struct ConnectionPool { databases: Vec>>, @@ -52,7 +57,7 @@ impl ConnectionPool { ); let pool = Pool::builder() .max_size(POOL_SIZE) - .connection_timeout(std::time::Duration::from_millis(5000)) + .connection_timeout(std::time::Duration::from_millis(CONNECT_TIMEOUT)) .test_on_check_out(false) .build(manager) .await @@ -65,7 +70,7 @@ impl ConnectionPool { databases: vec![databases], addresses: vec![addresses], round_robin: Arc::new(AtomicUsize::new(0)), - banlist: Arc::new(Mutex::new(HashMap::new())), + banlist: Arc::new(Mutex::new(vec![HashMap::new()])), } } @@ -87,16 +92,17 @@ impl ConnectionPool { self.round_robin.fetch_add(1, Ordering::SeqCst) % self.databases[shard].len(); let address = self.addresses[shard][index].clone(); - if self.is_banned(&address) { + if self.is_banned(&address, shard) { continue; } // Check if we can connect + // TODO: implement query wait timeout, i.e. time to get a conn from the pool let mut conn = match self.databases[shard][index].get().await { Ok(conn) => conn, Err(err) => { println!(">> Banning replica {}, error: {:?}", index, err); - self.ban(&address); + self.ban(&address, shard); continue; } }; @@ -109,7 +115,7 @@ impl ConnectionPool { let server = &mut *conn; match tokio::time::timeout( - tokio::time::Duration::from_millis(1000), + tokio::time::Duration::from_millis(HEALTHCHECK_TIMEOUT), server.query("SELECT 1"), ) .await @@ -120,7 +126,7 @@ impl ConnectionPool { ">> Banning replica {} because of failed health check", index ); - self.ban(&address); + self.ban(&address, shard); continue; } } @@ -130,27 +136,27 @@ impl ConnectionPool { /// Ban an address (i.e. replica). It no longer will serve /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. - pub fn ban(&self, address: &Address) { + pub fn ban(&self, address: &Address, shard: usize) { println!(">> Banning {:?}", address); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.lock().unwrap(); - guard.insert(address.clone(), now); + guard[shard].insert(address.clone(), now); } /// Clear the replica to receive traffic again. Takes effect immediately /// for all new transactions. - pub fn unban(&self, address: &Address) { + pub fn unban(&self, address: &Address, shard: usize) { let mut guard = self.banlist.lock().unwrap(); - guard.remove(address); + guard[shard].remove(address); } /// Check if a replica can serve traffic. If all replicas are banned, /// we unban all of them. Better to try then not to. - pub fn is_banned(&self, address: &Address) -> bool { + pub fn is_banned(&self, address: &Address, shard: usize) -> bool { let mut guard = self.banlist.lock().unwrap(); // Everything is banned, nothig is banned - if guard.len() == self.databases.len() { + if guard[shard].len() == self.databases[shard].len() { guard.clear(); drop(guard); println!(">> Unbanning all replicas."); @@ -158,12 +164,12 @@ impl ConnectionPool { } // I expect this to miss 99.9999% of the time. - match guard.get(address) { + match guard[shard].get(address) { Some(timestamp) => { let now = chrono::offset::Utc::now().naive_utc(); if now.timestamp() - timestamp.timestamp() > BAN_TIME { // 1 minute - guard.remove(address); + guard[shard].remove(address); false } else { true