From de800b8a107c0b8a053012f26a1b3a1a1177948f Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sat, 5 Feb 2022 13:25:03 -0800 Subject: [PATCH] comments --- src/pool.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 57a7b7e..e68d6c8 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -1,3 +1,4 @@ +/// Pooling and failover and banlist. use async_trait::async_trait; use bb8::{ManageConnection, PooledConnection}; use chrono::naive::NaiveDateTime; @@ -17,6 +18,10 @@ pub type BanList = Arc>>; pub type Counter = Arc; pub type ClientServerMap = Arc>>; +// 60 seconds of ban time. +// After that, the replica will be allowed to serve traffic again. +const BAN_TIME: i64 = 60; + pub struct ServerPool { replica_pool: ReplicaPool, user: User, @@ -103,7 +108,7 @@ impl ManageConnection for ServerPool { } } -/// A collection of servers, which could either be a single primary, +/// A collection of addresses, which could either be a single primary, /// many sharded primaries or replicas. #[derive(Clone)] pub struct ReplicaPool { @@ -113,6 +118,7 @@ pub struct ReplicaPool { } impl ReplicaPool { + /// Create a new replica pool. Addresses must be known in advance. pub async fn new(addresses: Vec
) -> ReplicaPool { ReplicaPool { addresses: addresses, @@ -121,6 +127,9 @@ impl ReplicaPool { } } + /// Ban an address (i.e. replica). It no longer will serve + /// traffic for any new transactions. Existing transactions on that replica + /// will finish successfully or error out to the clients. pub fn ban(&self, address: &Address) { println!(">> Banning {:?}", address); let now = chrono::offset::Utc::now().naive_utc(); @@ -128,11 +137,15 @@ impl ReplicaPool { guard.insert(address.clone(), now); } + /// Clear the replica to receive traffic again. Takes effect immediately + /// for all new transactions. pub fn unban(&self, address: &Address) { let mut guard = self.banlist.lock().unwrap(); guard.remove(address); } + /// Check if a replica can serve traffic. If all replicas are banned, + /// we unban all of them. Better to try then not to. pub fn is_banned(&self, address: &Address) -> bool { let mut guard = self.banlist.lock().unwrap(); @@ -140,7 +153,7 @@ impl ReplicaPool { if guard.len() == self.addresses.len() { guard.clear(); drop(guard); - println!(">> Unbanning all"); + println!(">> Unbanning all replicas."); return false; } @@ -148,7 +161,7 @@ impl ReplicaPool { match guard.get(address) { Some(timestamp) => { let now = chrono::offset::Utc::now().naive_utc(); - if now.timestamp() - timestamp.timestamp() > 60 { + if now.timestamp() - timestamp.timestamp() > BAN_TIME { // 1 minute guard.remove(address); false @@ -161,6 +174,10 @@ impl ReplicaPool { } } + /// Get a replica to route the query to. + /// Will attempt to fetch a healthy replica. It will also + /// round-robin them for reasonably equal load. Round-robin is done + /// per transaction. pub fn get(&self) -> Address { loop { // We'll never hit a 64-bit overflow right....right? :-)