Compare commits

..

3 Commits

Author SHA1 Message Date
Mostafa
ef811c4223 fix 2025-01-23 21:25:03 -06:00
Mostafa
527ecebaec fix tests 2025-01-23 21:09:38 -06:00
Mostafa
03c291f171 Avoid holding on to pools after reload 2025-01-23 20:32:48 -06:00
6 changed files with 27 additions and 180 deletions

View File

@@ -298,19 +298,6 @@ Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy connections
### checkout_failure_limit
```
path: pools.<pool_name>.checkout_failure_limit
default: 0 (disabled)
```
`Maximum number of checkout failures a client is allowed before it
gets disconnected. This is needed to prevent persistent client/server
imbalance in high availability setups where multiple PgCat instances are placed
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
in session mode
`
### default_role
```
path: pools.<pool_name>.default_role

View File

@@ -858,8 +858,14 @@ where
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut checkout_failure_count: u64 = 0;
let pool_settings = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default().settings
} else {
self.get_pool().await?.settings
};
query_router.update_pool_settings(&pool_settings);
query_router.set_default_role();
self.stats.register(self.stats.clone());
@@ -872,19 +878,6 @@ where
&self.pool_name,
);
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
@@ -935,6 +928,18 @@ where
continue;
}
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};
query_router.update_pool_settings(&pool.settings);
// Handle all custom protocol commands, if any.
if self
.handle_custom_protocol(&mut query_router, &message, &pool)
@@ -1057,11 +1062,12 @@ where
};
// Check if the pool is paused and wait until it's resumed.
pool.wait_paused().await;
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);
if pool.paused() {
pool.wait_paused().await;
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);
}
debug!("Waiting for connection from pool");
if !self.admin {
@@ -1110,25 +1116,7 @@ where
query_router.role(),
err
);
checkout_failure_count += 1;
if let Some(limit) = pool.settings.checkout_failure_limit {
if checkout_failure_count >= limit {
error!(
"Checkout failure limit reached ({} / {}) - disconnecting client",
checkout_failure_count, limit
);
error_response_terminal(
&mut self.write,
&format!(
"checkout failure limit reached ({} / {})",
checkout_failure_count, limit
),
)
.await?;
self.stats.disconnect();
return Ok(());
}
}
continue;
}
};

View File

@@ -558,14 +558,6 @@ pub struct Pool {
/// Close idle connections that have been opened for longer than this.
pub idle_timeout: Option<u64>,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
/// Close server connections that have been opened for longer than this.
/// Only applied to idle connections. If the connection is actively used for
/// longer than this period, the pool will not interrupt it.
@@ -790,7 +782,6 @@ impl Default for Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
checkout_failure_limit: None,
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
@@ -1307,17 +1298,6 @@ impl Config {
None => self.general.idle_timeout,
};
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
match pool_config.checkout_failure_limit {
Some(checkout_failure_limit) => {
info!(
"[pool: {}] Checkout failure limit: {}",
pool_name, checkout_failure_limit
);
}
None => {
info!("[pool: {}] Checkout failure limit: not set", pool_name);
}
};
info!(
"[pool: {}] Sharding function: {}",
pool_name,

View File

@@ -152,14 +152,6 @@ pub struct PoolSettings {
/// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
// Number of shards.
pub shards: usize,
@@ -235,7 +227,6 @@ impl Default for PoolSettings {
PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 1,
user: User::default(),
db: String::default(),
@@ -546,7 +537,6 @@ impl ConnectionPool {
None => pool_config.pool_mode,
},
load_balancing_mode: pool_config.load_balancing_mode,
checkout_failure_limit: pool_config.checkout_failure_limit,
// shards: pool_config.shards.clone(),
shards: shard_ids.len(),
user: user.clone(),

View File

@@ -1617,7 +1617,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
@@ -1700,7 +1699,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: Some(10),
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),

View File

@@ -188,102 +188,6 @@ describe "Miscellaneous" do
end
end
describe "Checkout failure limit" do
context "when no checkout failure limit is set" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set high" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set low" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "disconnects client after reaching limit" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
checkout_failure_count = 0
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(1);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
checkout_failure_count += 1
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::ConnectionBad
expect(checkout_failure_count).to eq(2)
expect(conn.status).to eq(PG::CONNECTION_BAD)
break
end
end
conn.close
end
end.each(&:join)
puts processes.pgcat.logs
end
end
end
describe "Server version reporting" do
it "reports correct version for normal and admin databases" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))