From 3349cecc1897e5e5727ffe0ba4edb31c5a000f15 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Thu, 27 Feb 2025 13:17:00 -0600 Subject: [PATCH] Add checkout_failure_limit config/feature (#911) In a high availability deployment of PgCat, it is possible that a client may land on a container of PgCat that is very busy with clients and as such the new client might be perpetually stuck in checkout failure loop because all connections are used by other clients. This is specially true in session mode pools with long-lived client connections (e.g. FDW connections). One way to fix this issue is to close client connections after they encounter some number of checkout failure. This will force the client to hit the Network load balancer again, land on a different process/container, try to checkout a connection on the new process/container. if it fails, it is disconnected and tries with another one. This mechanism is guaranteed to eventually land on a balanced state where all clients are able to find connections provided that the overall number of connections across all containers matches the number of clients. I was able to reproduce this issue in a control environment and was able to show this PR is able to fix it. --- CONFIG.md | 13 ++++++ src/client.rs | 22 +++++++++- src/config.rs | 20 +++++++++ src/pool.rs | 10 +++++ src/query_router.rs | 2 + tests/ruby/misc_spec.rb | 96 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 162 insertions(+), 1 deletion(-) diff --git a/CONFIG.md b/CONFIG.md index b6e16a7..a40dec3 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -298,6 +298,19 @@ Load balancing mode `random` selects the server at random `loc` selects the server with the least outstanding busy connections +### checkout_failure_limit +``` +path: pools..checkout_failure_limit +default: 0 (disabled) +``` + +`Maximum number of checkout failures a client is allowed before it +gets disconnected. This is needed to prevent persistent client/server +imbalance in high availability setups where multiple PgCat instances are placed +behind a single load balancer. If for any reason a client lands on a PgCat instance that has +a large number of connected clients, it might get stuck in perpetual checkout failure loop especially +in session mode +` ### default_role ``` path: pools..default_role diff --git a/src/client.rs b/src/client.rs index c226436..c72e9d2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -859,6 +859,8 @@ where // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); + let mut checkout_failure_count: u64 = 0; + self.stats.register(self.stats.clone()); // Result returned by one of the plugins. @@ -1108,7 +1110,25 @@ where query_router.role(), err ); - + checkout_failure_count += 1; + if let Some(limit) = pool.settings.checkout_failure_limit { + if checkout_failure_count >= limit { + error!( + "Checkout failure limit reached ({} / {}) - disconnecting client", + checkout_failure_count, limit + ); + error_response_terminal( + &mut self.write, + &format!( + "checkout failure limit reached ({} / {})", + checkout_failure_count, limit + ), + ) + .await?; + self.stats.disconnect(); + return Ok(()); + } + } continue; } }; diff --git a/src/config.rs b/src/config.rs index 39eb115..e56f92b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -558,6 +558,14 @@ pub struct Pool { /// Close idle connections that have been opened for longer than this. pub idle_timeout: Option, + /// Maximum number of checkout failures a client is allowed before it + /// gets disconnected. This is needed to prevent persistent client/server + /// imbalance in high availability setups where multiple PgCat instances are placed + /// behind a single load balancer. If for any reason a client lands on a PgCat instance that has + /// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially + /// in session mode + pub checkout_failure_limit: Option, + /// Close server connections that have been opened for longer than this. /// Only applied to idle connections. If the connection is actively used for /// longer than this period, the pool will not interrupt it. @@ -782,6 +790,7 @@ impl Default for Pool { Pool { pool_mode: Self::default_pool_mode(), load_balancing_mode: Self::default_load_balancing_mode(), + checkout_failure_limit: None, default_role: String::from("any"), query_parser_enabled: false, query_parser_max_length: None, @@ -1298,6 +1307,17 @@ impl Config { None => self.general.idle_timeout, }; info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout); + match pool_config.checkout_failure_limit { + Some(checkout_failure_limit) => { + info!( + "[pool: {}] Checkout failure limit: {}", + pool_name, checkout_failure_limit + ); + } + None => { + info!("[pool: {}] Checkout failure limit: not set", pool_name); + } + }; info!( "[pool: {}] Sharding function: {}", pool_name, diff --git a/src/pool.rs b/src/pool.rs index e3ddc43..7ecf24c 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -152,6 +152,14 @@ pub struct PoolSettings { /// Random or LeastOutstandingConnections. pub load_balancing_mode: LoadBalancingMode, + /// Maximum number of checkout failures a client is allowed before it + /// gets disconnected. This is needed to prevent persistent client/server + /// imbalance in high availability setups where multiple PgCat instances are placed + /// behind a single load balancer. If for any reason a client lands on a PgCat instance that has + /// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially + /// in session mode + pub checkout_failure_limit: Option, + // Number of shards. pub shards: usize, @@ -227,6 +235,7 @@ impl Default for PoolSettings { PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: LoadBalancingMode::Random, + checkout_failure_limit: None, shards: 1, user: User::default(), db: String::default(), @@ -537,6 +546,7 @@ impl ConnectionPool { None => pool_config.pool_mode, }, load_balancing_mode: pool_config.load_balancing_mode, + checkout_failure_limit: pool_config.checkout_failure_limit, // shards: pool_config.shards.clone(), shards: shard_ids.len(), user: user.clone(), diff --git a/src/query_router.rs b/src/query_router.rs index 16683b7..a996e66 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1617,6 +1617,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: crate::config::LoadBalancingMode::Random, + checkout_failure_limit: None, shards: 2, user: crate::config::User::default(), default_role: Some(Role::Replica), @@ -1699,6 +1700,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, load_balancing_mode: crate::config::LoadBalancingMode::Random, + checkout_failure_limit: Some(10), shards: 5, user: crate::config::User::default(), default_role: Some(Role::Replica), diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index aa17e8e..7cb0dc0 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -188,6 +188,102 @@ describe "Miscellaneous" do end end + describe "Checkout failure limit" do + context "when no checkout failure limit is set" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "does not disconnect client" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(0.5);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + expect(conn.status).to eq(PG::CONNECTION_OK) + end + end + conn.close + end + end.each(&:join) + end + end + + context "when checkout failure limit is set high" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "does not disconnect client" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(0.5);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + expect(conn.status).to eq(PG::CONNECTION_OK) + end + end + conn.close + end + end.each(&:join) + end + end + + context "when checkout failure limit is set low" do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 200 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + sleep 0.5 + end + + it "disconnects client after reaching limit" do + Array.new(5) do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + checkout_failure_count = 0 + for i in 0..4 + begin + conn.async_exec("SELECT pg_sleep(1);") + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::SystemError + checkout_failure_count += 1 + expect(conn.status).to eq(PG::CONNECTION_OK) + rescue PG::ConnectionBad + expect(checkout_failure_count).to eq(2) + expect(conn.status).to eq(PG::CONNECTION_BAD) + break + end + end + conn.close + end + end.each(&:join) + puts processes.pgcat.logs + + end + end + end + describe "Server version reporting" do it "reports correct version for normal and admin databases" do server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))