diff --git a/src/main.rs b/src/main.rs index 9b084ec..3673e0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -73,6 +73,7 @@ async fn main() { "> Healthcheck timeout: {}ms", config.general.healthcheck_timeout ); + println!("> Connection timeout: {}ms", config.general.connect_timeout); let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await; let transaction_mode = config.general.pool_mode == "transaction"; diff --git a/src/pool.rs b/src/pool.rs index 49e13e5..8cf3ada 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -26,6 +26,7 @@ pub struct ConnectionPool { banlist: BanList, healthcheck_timeout: u64, ban_time: i64, + pool_size: u32, } impl ConnectionPool { @@ -96,6 +97,7 @@ impl ConnectionPool { banlist: Arc::new(Mutex::new(banlist)), healthcheck_timeout: config.general.healthcheck_timeout, ban_time: config.general.ban_time, + pool_size: config.general.pool_size, } } @@ -115,12 +117,29 @@ impl ConnectionPool { let mut allowed_attempts = match role { // Primary-specific queries get one attempt, if the primary is down, - // nothing we can do. - Some(Role::Primary) => 1, + // nothing we should do about it I think. It's dangerous to retry + // write queries. + Some(Role::Primary) => { + // Make sure we have a primary in the pool configured. + let primary_present = self.addresses[shard] + .iter() + .filter(|&db| db.role == Role::Primary) + .count(); - // Replicas get to try as many times as there are replicas. - Some(Role::Replica) => self.databases[shard].len(), - None => self.databases[shard].len(), + // TODO: return this error to the client, so people don't have to look in + // the logs to figure out what happened. + if primary_present == 0 { + println!(">> Error: Primary requested but none are configured."); + return Err(Error::AllServersDown); + } + + // Primary gets one attempt. + 1 + } + + // Replicas get to try as many times as there are replicas + // and connections in the pool. + _ => self.databases[shard].len() * self.pool_size as usize, }; while allowed_attempts > 0 { @@ -184,6 +203,9 @@ impl ConnectionPool { ">> Banning replica {} because of failed health check", index ); + // Don't leave a bad connection in the pool. + server.mark_bad(); + self.ban(&address, shard); continue; } @@ -194,6 +216,9 @@ impl ConnectionPool { ">> Banning replica {} because of health check timeout", index ); + // Don't leave a bad connection in the pool. + server.mark_bad(); + self.ban(&address, shard); continue; }