mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 10:06:28 +00:00
fix banlist
This commit is contained in:
@@ -63,7 +63,7 @@ async fn main() {
|
||||
},
|
||||
Address {
|
||||
host: "localhost".to_string(),
|
||||
port: "5432".to_string(),
|
||||
port: "5433".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
|
||||
42
src/pool.rs
42
src/pool.rs
@@ -14,18 +14,23 @@ use std::sync::{
|
||||
};
|
||||
|
||||
// Banlist: bad servers go in here.
|
||||
pub type BanList = Arc<Mutex<HashMap<Address, NaiveDateTime>>>;
|
||||
pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>;
|
||||
pub type Counter = Arc<AtomicUsize>;
|
||||
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
|
||||
|
||||
// 60 seconds of ban time.
|
||||
// After that, the replica will be allowed to serve traffic again.
|
||||
const BAN_TIME: i64 = 60;
|
||||
//
|
||||
// Poor man's config
|
||||
//
|
||||
|
||||
// DB pool size (per actual database server)
|
||||
const POOL_SIZE: u32 = 15;
|
||||
|
||||
// 5 seconds to connect before we give up
|
||||
const CONNECT_TIMEOUT: u64 = 5000;
|
||||
|
||||
// How much time to give the server to answer a SELECT 1 query.
|
||||
const HEALTHCHECK_TIMEOUT: u64 = 1000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectionPool {
|
||||
databases: Vec<Vec<Pool<ServerPool>>>,
|
||||
@@ -52,7 +57,7 @@ impl ConnectionPool {
|
||||
);
|
||||
let pool = Pool::builder()
|
||||
.max_size(POOL_SIZE)
|
||||
.connection_timeout(std::time::Duration::from_millis(5000))
|
||||
.connection_timeout(std::time::Duration::from_millis(CONNECT_TIMEOUT))
|
||||
.test_on_check_out(false)
|
||||
.build(manager)
|
||||
.await
|
||||
@@ -65,7 +70,7 @@ impl ConnectionPool {
|
||||
databases: vec![databases],
|
||||
addresses: vec![addresses],
|
||||
round_robin: Arc::new(AtomicUsize::new(0)),
|
||||
banlist: Arc::new(Mutex::new(HashMap::new())),
|
||||
banlist: Arc::new(Mutex::new(vec![HashMap::new()])),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,16 +92,17 @@ impl ConnectionPool {
|
||||
self.round_robin.fetch_add(1, Ordering::SeqCst) % self.databases[shard].len();
|
||||
let address = self.addresses[shard][index].clone();
|
||||
|
||||
if self.is_banned(&address) {
|
||||
if self.is_banned(&address, shard) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if we can connect
|
||||
// TODO: implement query wait timeout, i.e. time to get a conn from the pool
|
||||
let mut conn = match self.databases[shard][index].get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
println!(">> Banning replica {}, error: {:?}", index, err);
|
||||
self.ban(&address);
|
||||
self.ban(&address, shard);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -109,7 +115,7 @@ impl ConnectionPool {
|
||||
let server = &mut *conn;
|
||||
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(1000),
|
||||
tokio::time::Duration::from_millis(HEALTHCHECK_TIMEOUT),
|
||||
server.query("SELECT 1"),
|
||||
)
|
||||
.await
|
||||
@@ -120,7 +126,7 @@ impl ConnectionPool {
|
||||
">> Banning replica {} because of failed health check",
|
||||
index
|
||||
);
|
||||
self.ban(&address);
|
||||
self.ban(&address, shard);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -130,27 +136,27 @@ impl ConnectionPool {
|
||||
/// Ban an address (i.e. replica). It no longer will serve
|
||||
/// traffic for any new transactions. Existing transactions on that replica
|
||||
/// will finish successfully or error out to the clients.
|
||||
pub fn ban(&self, address: &Address) {
|
||||
pub fn ban(&self, address: &Address, shard: usize) {
|
||||
println!(">> Banning {:?}", address);
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
guard.insert(address.clone(), now);
|
||||
guard[shard].insert(address.clone(), now);
|
||||
}
|
||||
|
||||
/// Clear the replica to receive traffic again. Takes effect immediately
|
||||
/// for all new transactions.
|
||||
pub fn unban(&self, address: &Address) {
|
||||
pub fn unban(&self, address: &Address, shard: usize) {
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
guard.remove(address);
|
||||
guard[shard].remove(address);
|
||||
}
|
||||
|
||||
/// Check if a replica can serve traffic. If all replicas are banned,
|
||||
/// we unban all of them. Better to try then not to.
|
||||
pub fn is_banned(&self, address: &Address) -> bool {
|
||||
pub fn is_banned(&self, address: &Address, shard: usize) -> bool {
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
|
||||
// Everything is banned, nothig is banned
|
||||
if guard.len() == self.databases.len() {
|
||||
if guard[shard].len() == self.databases[shard].len() {
|
||||
guard.clear();
|
||||
drop(guard);
|
||||
println!(">> Unbanning all replicas.");
|
||||
@@ -158,12 +164,12 @@ impl ConnectionPool {
|
||||
}
|
||||
|
||||
// I expect this to miss 99.9999% of the time.
|
||||
match guard.get(address) {
|
||||
match guard[shard].get(address) {
|
||||
Some(timestamp) => {
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
if now.timestamp() - timestamp.timestamp() > BAN_TIME {
|
||||
// 1 minute
|
||||
guard.remove(address);
|
||||
guard[shard].remove(address);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
|
||||
Reference in New Issue
Block a user