mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
3 Commits
mostafa_av
...
mostafa_ch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e299a2e71a | ||
|
|
00ac44427f | ||
|
|
147eba52c0 |
13
CONFIG.md
13
CONFIG.md
@@ -298,6 +298,19 @@ Load balancing mode
|
||||
`random` selects the server at random
|
||||
`loc` selects the server with the least outstanding busy connections
|
||||
|
||||
### checkout_failure_limit
|
||||
```
|
||||
path: pools.<pool_name>.checkout_failure_limit
|
||||
default: 0 (disabled)
|
||||
```
|
||||
|
||||
`Maximum number of checkout failures a client is allowed before it
|
||||
gets disconnected. This is needed to prevent persistent client/server
|
||||
imbalance in high availability setups where multiple PgCat instances are placed
|
||||
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
in session mode
|
||||
`
|
||||
### default_role
|
||||
```
|
||||
path: pools.<pool_name>.default_role
|
||||
|
||||
@@ -858,14 +858,8 @@ where
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
let pool_settings = if self.admin {
|
||||
// Admin clients do not use pools.
|
||||
ConnectionPool::default().settings
|
||||
} else {
|
||||
self.get_pool().await?.settings
|
||||
};
|
||||
query_router.update_pool_settings(&pool_settings);
|
||||
query_router.set_default_role();
|
||||
|
||||
let mut checkout_failure_count: u64 = 0;
|
||||
|
||||
self.stats.register(self.stats.clone());
|
||||
|
||||
@@ -878,6 +872,19 @@ where
|
||||
&self.pool_name,
|
||||
);
|
||||
|
||||
// Get a pool instance referenced by the most up-to-date
|
||||
// pointer. This ensures we always read the latest config
|
||||
// when starting a query.
|
||||
let mut pool = if self.admin {
|
||||
// Admin clients do not use pools.
|
||||
ConnectionPool::default()
|
||||
} else {
|
||||
self.get_pool().await?
|
||||
};
|
||||
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
query_router.set_default_role();
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
// or issue commands for our sharding and server selection protocol.
|
||||
@@ -928,18 +935,6 @@ where
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get a pool instance referenced by the most up-to-date
|
||||
// pointer. This ensures we always read the latest config
|
||||
// when starting a query.
|
||||
let mut pool = if self.admin {
|
||||
// Admin clients do not use pools.
|
||||
ConnectionPool::default()
|
||||
} else {
|
||||
self.get_pool().await?
|
||||
};
|
||||
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
|
||||
// Handle all custom protocol commands, if any.
|
||||
if self
|
||||
.handle_custom_protocol(&mut query_router, &message, &pool)
|
||||
@@ -1062,12 +1057,11 @@ where
|
||||
};
|
||||
|
||||
// Check if the pool is paused and wait until it's resumed.
|
||||
if pool.paused() {
|
||||
pool.wait_paused().await;
|
||||
// Refresh pool information, something might have changed.
|
||||
pool = self.get_pool().await?;
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
}
|
||||
pool.wait_paused().await;
|
||||
|
||||
// Refresh pool information, something might have changed.
|
||||
pool = self.get_pool().await?;
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
|
||||
debug!("Waiting for connection from pool");
|
||||
if !self.admin {
|
||||
@@ -1116,7 +1110,25 @@ where
|
||||
query_router.role(),
|
||||
err
|
||||
);
|
||||
|
||||
checkout_failure_count += 1;
|
||||
if let Some(limit) = pool.settings.checkout_failure_limit {
|
||||
if checkout_failure_count >= limit {
|
||||
error!(
|
||||
"Checkout failure limit reached ({} / {}) - disconnecting client",
|
||||
checkout_failure_count, limit
|
||||
);
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
&format!(
|
||||
"checkout failure limit reached ({} / {})",
|
||||
checkout_failure_count, limit
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
self.stats.disconnect();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -558,6 +558,14 @@ pub struct Pool {
|
||||
/// Close idle connections that have been opened for longer than this.
|
||||
pub idle_timeout: Option<u64>,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
/// Close server connections that have been opened for longer than this.
|
||||
/// Only applied to idle connections. If the connection is actively used for
|
||||
/// longer than this period, the pool will not interrupt it.
|
||||
@@ -782,6 +790,7 @@ impl Default for Pool {
|
||||
Pool {
|
||||
pool_mode: Self::default_pool_mode(),
|
||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||
checkout_failure_limit: None,
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
query_parser_max_length: None,
|
||||
@@ -1298,6 +1307,17 @@ impl Config {
|
||||
None => self.general.idle_timeout,
|
||||
};
|
||||
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
|
||||
match pool_config.checkout_failure_limit {
|
||||
Some(checkout_failure_limit) => {
|
||||
info!(
|
||||
"[pool: {}] Checkout failure limit: {}",
|
||||
pool_name, checkout_failure_limit
|
||||
);
|
||||
}
|
||||
None => {
|
||||
info!("[pool: {}] Checkout failure limit: not set", pool_name);
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"[pool: {}] Sharding function: {}",
|
||||
pool_name,
|
||||
|
||||
10
src/pool.rs
10
src/pool.rs
@@ -152,6 +152,14 @@ pub struct PoolSettings {
|
||||
/// Random or LeastOutstandingConnections.
|
||||
pub load_balancing_mode: LoadBalancingMode,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
// Number of shards.
|
||||
pub shards: usize,
|
||||
|
||||
@@ -227,6 +235,7 @@ impl Default for PoolSettings {
|
||||
PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 1,
|
||||
user: User::default(),
|
||||
db: String::default(),
|
||||
@@ -537,6 +546,7 @@ impl ConnectionPool {
|
||||
None => pool_config.pool_mode,
|
||||
},
|
||||
load_balancing_mode: pool_config.load_balancing_mode,
|
||||
checkout_failure_limit: pool_config.checkout_failure_limit,
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
user: user.clone(),
|
||||
|
||||
@@ -1617,6 +1617,7 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 2,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
@@ -1699,6 +1700,7 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: Some(10),
|
||||
shards: 5,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
|
||||
@@ -188,6 +188,102 @@ describe "Miscellaneous" do
|
||||
end
|
||||
end
|
||||
|
||||
describe "Checkout failure limit" do
|
||||
context "when no checkout failure limit is set" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set high" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set low" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "disconnects client after reaching limit" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
checkout_failure_count = 0
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(1);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
checkout_failure_count += 1
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::ConnectionBad
|
||||
expect(checkout_failure_count).to eq(2)
|
||||
expect(conn.status).to eq(PG::CONNECTION_BAD)
|
||||
break
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
puts processes.pgcat.logs
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Server version reporting" do
|
||||
it "reports correct version for normal and admin databases" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
Reference in New Issue
Block a user