diff --git a/CONFIG.md b/CONFIG.md index b6e16a7..a40dec3 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -298,6 +298,19 @@ Load balancing mode `random` selects the server at random `loc` selects the server with the least outstanding busy connections +### checkout_failure_limit +``` +path: pools..checkout_failure_limit +default: 0 (disabled) +``` + +`Maximum number of checkout failures a client is allowed before it +gets disconnected. This is needed to prevent persistent client/server +imbalance in high availability setups where multiple PgCat instances are placed +behind a single load balancer. If for any reason a client lands on a PgCat instance that has +a large number of connected clients, it might get stuck in perpetual checkout failure loop especially +in session mode +` ### default_role ``` path: pools..default_role diff --git a/src/client.rs b/src/client.rs index c226436..c72e9d2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -859,6 +859,8 @@ where // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); + let mut checkout_failure_count: u64 = 0; + self.stats.register(self.stats.clone()); // Result returned by one of the plugins. @@ -1108,7 +1110,25 @@ where query_router.role(), err ); - + checkout_failure_count += 1; + if let Some(limit) = pool.settings.checkout_failure_limit { + if checkout_failure_count >= limit { + error!( + "Checkout failure limit reached ({} / {}) - disconnecting client", + checkout_failure_count, limit + ); + error_response_terminal( + &mut self.write, + &format!( + "checkout failure limit reached ({} / {})", + checkout_failure_count, limit + ), + ) + .await?; + self.stats.disconnect(); + return Ok(()); + } + } continue; } }; diff --git a/src/config.rs b/src/config.rs index 39eb115..e56f92b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -558,6 +558,14 @@ pub struct Pool { /// Close idle connections that have been opened for longer than this. pub idle_timeout: Option, + /// Maximum number of checkout failures a client is allowed before it + /// gets disconnected. This is needed to prevent persistent client/server + /// imbalance in high availability setups where multiple PgCat instances are placed + /// behind a single load balancer. If for any reason a client lands on a PgCat instance that has + /// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially + /// in session mode + pub checkout_failure_limit: Option, + /// Close server connections that have been opened for longer than this. /// Only applied to idle connections. If the connection is actively used for /// longer than this period, the pool will not interrupt it. @@ -782,6 +790,7 @@ impl Default for Pool { Pool { pool_mode: Self::default_pool_mode(), load_balancing_mode: Self::default_load_balancing_mode(), + checkout_failure_limit: None, default_role: String::from("any"), query_parser_enabled: false, query_parser_max_length: None, @@ -1298,6 +1307,17 @@ impl Config { None => self.general.idle_timeout, }; info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout); + match pool_config.checkout_failure_limit { + Some(checkout_failure_limit) => { + info!( + "[pool: {}] Checkout failure limit: {}", + pool_name, checkout_failure_limit + ); + } + None => { + info!("[pool: {}] Checkout failure limit: not set", pool_name); + } + }; info!( "[pool: {}] Sharding function: {}", pool_name, diff --git a/src/pool.rs b/src/pool.rs index e3ddc43..7ecf24c 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -152,6 +152,14 @@ pub struct PoolSettings { /// Random or LeastOutstandingConnections. pub load_balancing_mode: LoadBalancingMode, + /// Maximum number of checkout failures a client is allowed before it + /// gets disconnected. This is needed to prevent persistent client/server + /// imbalance in high availability setups where multiple PgCat instances are placed + /// behind a single load balancer. If for any reason a client lands on a PgCat instance that has + /// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially + /// in session mode + pub checkout_failure_limit: Option, + // Number of shards. pub shards: usize, @@ -227,6 +235,7 @@ impl Default for PoolSettings { PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: LoadBalancingMode::Random, + checkout_failure_limit: None, shards: 1, user: User::default(), db: String::default(), @@ -537,6 +546,7 @@ impl ConnectionPool { None => pool_config.pool_mode, }, load_balancing_mode: pool_config.load_balancing_mode, + checkout_failure_limit: pool_config.checkout_failure_limit, // shards: pool_config.shards.clone(), shards: shard_ids.len(), user: user.clone(), diff --git a/src/query_router.rs b/src/query_router.rs index 16683b7..a996e66 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1617,6 +1617,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: crate::config::LoadBalancingMode::Random, + checkout_failure_limit: None, shards: 2, user: crate::config::User::default(), default_role: Some(Role::Replica), @@ -1699,6 +1700,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: crate::config::LoadBalancingMode::Random, + checkout_failure_limit: Some(10), shards: 5, user: crate::config::User::default(), default_role: Some(Role::Replica), diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index aa17e8e..7cb0dc0 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -188,6 +188,102 @@ describe "Miscellaneous" do end end + describe "Checkout failure limit" do + context "when no checkout failure limit is set" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "does not disconnect client" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(0.5);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + expect(conn.status).to eq(PG::CONNECTION_OK) + end + end + conn.close + end + end.each(&:join) + end + end + + context "when checkout failure limit is set high" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "does not disconnect client" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(0.5);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + expect(conn.status).to eq(PG::CONNECTION_OK) + end + end + conn.close + end + end.each(&:join) + end + end + + context "when checkout failure limit is set low" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "disconnects client after reaching limit" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + checkout_failure_count = 0 + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(1);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + checkout_failure_count += 1 + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::ConnectionBad + expect(checkout_failure_count).to eq(2) + expect(conn.status).to eq(PG::CONNECTION_BAD) + break + end + end + conn.close + end + end.each(&:join) + puts processes.pgcat.logs + + end + end + end + describe "Server version reporting" do it "reports correct version for normal and admin databases" do server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))