This commit is contained in:
Lev Kokotov
2022-02-05 13:25:03 -08:00
parent f41874c249
commit de800b8a10

View File

@@ -1,3 +1,4 @@
/// Pooling and failover and banlist.
use async_trait::async_trait; use async_trait::async_trait;
use bb8::{ManageConnection, PooledConnection}; use bb8::{ManageConnection, PooledConnection};
use chrono::naive::NaiveDateTime; use chrono::naive::NaiveDateTime;
@@ -17,6 +18,10 @@ pub type BanList = Arc<Mutex<HashMap<Address, NaiveDateTime>>>;
pub type Counter = Arc<AtomicUsize>; pub type Counter = Arc<AtomicUsize>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>; pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
// 60 seconds of ban time.
// After that, the replica will be allowed to serve traffic again.
const BAN_TIME: i64 = 60;
pub struct ServerPool { pub struct ServerPool {
replica_pool: ReplicaPool, replica_pool: ReplicaPool,
user: User, user: User,
@@ -103,7 +108,7 @@ impl ManageConnection for ServerPool {
} }
} }
/// A collection of servers, which could either be a single primary, /// A collection of addresses, which could either be a single primary,
/// many sharded primaries or replicas. /// many sharded primaries or replicas.
#[derive(Clone)] #[derive(Clone)]
pub struct ReplicaPool { pub struct ReplicaPool {
@@ -113,6 +118,7 @@ pub struct ReplicaPool {
} }
impl ReplicaPool { impl ReplicaPool {
/// Create a new replica pool. Addresses must be known in advance.
pub async fn new(addresses: Vec<Address>) -> ReplicaPool { pub async fn new(addresses: Vec<Address>) -> ReplicaPool {
ReplicaPool { ReplicaPool {
addresses: addresses, addresses: addresses,
@@ -121,6 +127,9 @@ impl ReplicaPool {
} }
} }
/// Ban an address (i.e. replica). It no longer will serve
/// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address) { pub fn ban(&self, address: &Address) {
println!(">> Banning {:?}", address); println!(">> Banning {:?}", address);
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
@@ -128,11 +137,15 @@ impl ReplicaPool {
guard.insert(address.clone(), now); guard.insert(address.clone(), now);
} }
/// Clear the replica to receive traffic again. Takes effect immediately
/// for all new transactions.
pub fn unban(&self, address: &Address) { pub fn unban(&self, address: &Address) {
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.lock().unwrap();
guard.remove(address); guard.remove(address);
} }
/// Check if a replica can serve traffic. If all replicas are banned,
/// we unban all of them. Better to try then not to.
pub fn is_banned(&self, address: &Address) -> bool { pub fn is_banned(&self, address: &Address) -> bool {
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.lock().unwrap();
@@ -140,7 +153,7 @@ impl ReplicaPool {
if guard.len() == self.addresses.len() { if guard.len() == self.addresses.len() {
guard.clear(); guard.clear();
drop(guard); drop(guard);
println!(">> Unbanning all"); println!(">> Unbanning all replicas.");
return false; return false;
} }
@@ -148,7 +161,7 @@ impl ReplicaPool {
match guard.get(address) { match guard.get(address) {
Some(timestamp) => { Some(timestamp) => {
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
if now.timestamp() - timestamp.timestamp() > 60 { if now.timestamp() - timestamp.timestamp() > BAN_TIME {
// 1 minute // 1 minute
guard.remove(address); guard.remove(address);
false false
@@ -161,6 +174,10 @@ impl ReplicaPool {
} }
} }
/// Get a replica to route the query to.
/// Will attempt to fetch a healthy replica. It will also
/// round-robin them for reasonably equal load. Round-robin is done
/// per transaction.
pub fn get(&self) -> Address { pub fn get(&self) -> Address {
loop { loop {
// We'll never hit a 64-bit overflow right....right? :-) // We'll never hit a 64-bit overflow right....right? :-)