pool fixes

This commit is contained in:
Lev Kokotov
2022-02-10 08:54:06 -08:00
parent daf120aeac
commit 8209633e05
2 changed files with 31 additions and 5 deletions

View File

@@ -73,6 +73,7 @@ async fn main() {
"> Healthcheck timeout: {}ms",
config.general.healthcheck_timeout
);
println!("> Connection timeout: {}ms", config.general.connect_timeout);
let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
let transaction_mode = config.general.pool_mode == "transaction";

View File

@@ -26,6 +26,7 @@ pub struct ConnectionPool {
banlist: BanList,
healthcheck_timeout: u64,
ban_time: i64,
pool_size: u32,
}
impl ConnectionPool {
@@ -96,6 +97,7 @@ impl ConnectionPool {
banlist: Arc::new(Mutex::new(banlist)),
healthcheck_timeout: config.general.healthcheck_timeout,
ban_time: config.general.ban_time,
pool_size: config.general.pool_size,
}
}
@@ -115,12 +117,29 @@ impl ConnectionPool {
let mut allowed_attempts = match role {
// Primary-specific queries get one attempt, if the primary is down,
// nothing we can do.
Some(Role::Primary) => 1,
// nothing we should do about it I think. It's dangerous to retry
// write queries.
Some(Role::Primary) => {
// Make sure we have a primary in the pool configured.
let primary_present = self.addresses[shard]
.iter()
.filter(|&db| db.role == Role::Primary)
.count();
// Replicas get to try as many times as there are replicas.
Some(Role::Replica) => self.databases[shard].len(),
None => self.databases[shard].len(),
// TODO: return this error to the client, so people don't have to look in
// the logs to figure out what happened.
if primary_present == 0 {
println!(">> Error: Primary requested but none are configured.");
return Err(Error::AllServersDown);
}
// Primary gets one attempt.
1
}
// Replicas get to try as many times as there are replicas
// and connections in the pool.
_ => self.databases[shard].len() * self.pool_size as usize,
};
while allowed_attempts > 0 {
@@ -184,6 +203,9 @@ impl ConnectionPool {
">> Banning replica {} because of failed health check",
index
);
// Don't leave a bad connection in the pool.
server.mark_bad();
self.ban(&address, shard);
continue;
}
@@ -194,6 +216,9 @@ impl ConnectionPool {
">> Banning replica {} because of health check timeout",
index
);
// Don't leave a bad connection in the pool.
server.mark_bad();
self.ban(&address, shard);
continue;
}