mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 18:36:28 +00:00
Compare commits
3 Commits
mostafa_bu
...
mostafa_av
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef811c4223 | ||
|
|
527ecebaec | ||
|
|
03c291f171 |
13
CONFIG.md
13
CONFIG.md
@@ -298,19 +298,6 @@ Load balancing mode
|
|||||||
`random` selects the server at random
|
`random` selects the server at random
|
||||||
`loc` selects the server with the least outstanding busy connections
|
`loc` selects the server with the least outstanding busy connections
|
||||||
|
|
||||||
### checkout_failure_limit
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.checkout_failure_limit
|
|
||||||
default: 0 (disabled)
|
|
||||||
```
|
|
||||||
|
|
||||||
`Maximum number of checkout failures a client is allowed before it
|
|
||||||
gets disconnected. This is needed to prevent persistent client/server
|
|
||||||
imbalance in high availability setups where multiple PgCat instances are placed
|
|
||||||
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
|
||||||
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
|
||||||
in session mode
|
|
||||||
`
|
|
||||||
### default_role
|
### default_role
|
||||||
```
|
```
|
||||||
path: pools.<pool_name>.default_role
|
path: pools.<pool_name>.default_role
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM rust:1.81.0-slim-bookworm AS builder
|
FROM rust:1.79.0-slim-bookworm AS builder
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y build-essential
|
apt-get install -y build-essential
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM cimg/rust:1.81.0
|
FROM cimg/rust:1.79.0
|
||||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||||
RUN /bin/yj -h
|
RUN /bin/yj -h
|
||||||
RUN sudo apt-get update && \
|
RUN sudo apt-get update && \
|
||||||
|
|||||||
@@ -858,8 +858,14 @@ where
|
|||||||
// The query router determines where the query is going to go,
|
// The query router determines where the query is going to go,
|
||||||
// e.g. primary, replica, which shard.
|
// e.g. primary, replica, which shard.
|
||||||
let mut query_router = QueryRouter::new();
|
let mut query_router = QueryRouter::new();
|
||||||
|
let pool_settings = if self.admin {
|
||||||
let mut checkout_failure_count: u64 = 0;
|
// Admin clients do not use pools.
|
||||||
|
ConnectionPool::default().settings
|
||||||
|
} else {
|
||||||
|
self.get_pool().await?.settings
|
||||||
|
};
|
||||||
|
query_router.update_pool_settings(&pool_settings);
|
||||||
|
query_router.set_default_role();
|
||||||
|
|
||||||
self.stats.register(self.stats.clone());
|
self.stats.register(self.stats.clone());
|
||||||
|
|
||||||
@@ -872,19 +878,6 @@ where
|
|||||||
&self.pool_name,
|
&self.pool_name,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Get a pool instance referenced by the most up-to-date
|
|
||||||
// pointer. This ensures we always read the latest config
|
|
||||||
// when starting a query.
|
|
||||||
let mut pool = if self.admin {
|
|
||||||
// Admin clients do not use pools.
|
|
||||||
ConnectionPool::default()
|
|
||||||
} else {
|
|
||||||
self.get_pool().await?
|
|
||||||
};
|
|
||||||
|
|
||||||
query_router.update_pool_settings(&pool.settings);
|
|
||||||
query_router.set_default_role();
|
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
// or issue commands for our sharding and server selection protocol.
|
// or issue commands for our sharding and server selection protocol.
|
||||||
@@ -935,6 +928,18 @@ where
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get a pool instance referenced by the most up-to-date
|
||||||
|
// pointer. This ensures we always read the latest config
|
||||||
|
// when starting a query.
|
||||||
|
let mut pool = if self.admin {
|
||||||
|
// Admin clients do not use pools.
|
||||||
|
ConnectionPool::default()
|
||||||
|
} else {
|
||||||
|
self.get_pool().await?
|
||||||
|
};
|
||||||
|
|
||||||
|
query_router.update_pool_settings(&pool.settings);
|
||||||
|
|
||||||
// Handle all custom protocol commands, if any.
|
// Handle all custom protocol commands, if any.
|
||||||
if self
|
if self
|
||||||
.handle_custom_protocol(&mut query_router, &message, &pool)
|
.handle_custom_protocol(&mut query_router, &message, &pool)
|
||||||
@@ -1057,11 +1062,12 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Check if the pool is paused and wait until it's resumed.
|
// Check if the pool is paused and wait until it's resumed.
|
||||||
pool.wait_paused().await;
|
if pool.paused() {
|
||||||
|
pool.wait_paused().await;
|
||||||
// Refresh pool information, something might have changed.
|
// Refresh pool information, something might have changed.
|
||||||
pool = self.get_pool().await?;
|
pool = self.get_pool().await?;
|
||||||
query_router.update_pool_settings(&pool.settings);
|
query_router.update_pool_settings(&pool.settings);
|
||||||
|
}
|
||||||
|
|
||||||
debug!("Waiting for connection from pool");
|
debug!("Waiting for connection from pool");
|
||||||
if !self.admin {
|
if !self.admin {
|
||||||
@@ -1110,25 +1116,7 @@ where
|
|||||||
query_router.role(),
|
query_router.role(),
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
checkout_failure_count += 1;
|
|
||||||
if let Some(limit) = pool.settings.checkout_failure_limit {
|
|
||||||
if checkout_failure_count >= limit {
|
|
||||||
error!(
|
|
||||||
"Checkout failure limit reached ({} / {}) - disconnecting client",
|
|
||||||
checkout_failure_count, limit
|
|
||||||
);
|
|
||||||
error_response_terminal(
|
|
||||||
&mut self.write,
|
|
||||||
&format!(
|
|
||||||
"checkout failure limit reached ({} / {})",
|
|
||||||
checkout_failure_count, limit
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
self.stats.disconnect();
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -558,14 +558,6 @@ pub struct Pool {
|
|||||||
/// Close idle connections that have been opened for longer than this.
|
/// Close idle connections that have been opened for longer than this.
|
||||||
pub idle_timeout: Option<u64>,
|
pub idle_timeout: Option<u64>,
|
||||||
|
|
||||||
/// Maximum number of checkout failures a client is allowed before it
|
|
||||||
/// gets disconnected. This is needed to prevent persistent client/server
|
|
||||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
|
||||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
|
||||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
|
||||||
/// in session mode
|
|
||||||
pub checkout_failure_limit: Option<u64>,
|
|
||||||
|
|
||||||
/// Close server connections that have been opened for longer than this.
|
/// Close server connections that have been opened for longer than this.
|
||||||
/// Only applied to idle connections. If the connection is actively used for
|
/// Only applied to idle connections. If the connection is actively used for
|
||||||
/// longer than this period, the pool will not interrupt it.
|
/// longer than this period, the pool will not interrupt it.
|
||||||
@@ -790,7 +782,6 @@ impl Default for Pool {
|
|||||||
Pool {
|
Pool {
|
||||||
pool_mode: Self::default_pool_mode(),
|
pool_mode: Self::default_pool_mode(),
|
||||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||||
checkout_failure_limit: None,
|
|
||||||
default_role: String::from("any"),
|
default_role: String::from("any"),
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
@@ -1307,17 +1298,6 @@ impl Config {
|
|||||||
None => self.general.idle_timeout,
|
None => self.general.idle_timeout,
|
||||||
};
|
};
|
||||||
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
|
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
|
||||||
match pool_config.checkout_failure_limit {
|
|
||||||
Some(checkout_failure_limit) => {
|
|
||||||
info!(
|
|
||||||
"[pool: {}] Checkout failure limit: {}",
|
|
||||||
pool_name, checkout_failure_limit
|
|
||||||
);
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
info!("[pool: {}] Checkout failure limit: not set", pool_name);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
info!(
|
info!(
|
||||||
"[pool: {}] Sharding function: {}",
|
"[pool: {}] Sharding function: {}",
|
||||||
pool_name,
|
pool_name,
|
||||||
|
|||||||
10
src/pool.rs
10
src/pool.rs
@@ -152,14 +152,6 @@ pub struct PoolSettings {
|
|||||||
/// Random or LeastOutstandingConnections.
|
/// Random or LeastOutstandingConnections.
|
||||||
pub load_balancing_mode: LoadBalancingMode,
|
pub load_balancing_mode: LoadBalancingMode,
|
||||||
|
|
||||||
/// Maximum number of checkout failures a client is allowed before it
|
|
||||||
/// gets disconnected. This is needed to prevent persistent client/server
|
|
||||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
|
||||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
|
||||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
|
||||||
/// in session mode
|
|
||||||
pub checkout_failure_limit: Option<u64>,
|
|
||||||
|
|
||||||
// Number of shards.
|
// Number of shards.
|
||||||
pub shards: usize,
|
pub shards: usize,
|
||||||
|
|
||||||
@@ -235,7 +227,6 @@ impl Default for PoolSettings {
|
|||||||
PoolSettings {
|
PoolSettings {
|
||||||
pool_mode: PoolMode::Transaction,
|
pool_mode: PoolMode::Transaction,
|
||||||
load_balancing_mode: LoadBalancingMode::Random,
|
load_balancing_mode: LoadBalancingMode::Random,
|
||||||
checkout_failure_limit: None,
|
|
||||||
shards: 1,
|
shards: 1,
|
||||||
user: User::default(),
|
user: User::default(),
|
||||||
db: String::default(),
|
db: String::default(),
|
||||||
@@ -546,7 +537,6 @@ impl ConnectionPool {
|
|||||||
None => pool_config.pool_mode,
|
None => pool_config.pool_mode,
|
||||||
},
|
},
|
||||||
load_balancing_mode: pool_config.load_balancing_mode,
|
load_balancing_mode: pool_config.load_balancing_mode,
|
||||||
checkout_failure_limit: pool_config.checkout_failure_limit,
|
|
||||||
// shards: pool_config.shards.clone(),
|
// shards: pool_config.shards.clone(),
|
||||||
shards: shard_ids.len(),
|
shards: shard_ids.len(),
|
||||||
user: user.clone(),
|
user: user.clone(),
|
||||||
|
|||||||
@@ -1617,7 +1617,6 @@ mod test {
|
|||||||
let pool_settings = PoolSettings {
|
let pool_settings = PoolSettings {
|
||||||
pool_mode: PoolMode::Transaction,
|
pool_mode: PoolMode::Transaction,
|
||||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||||
checkout_failure_limit: None,
|
|
||||||
shards: 2,
|
shards: 2,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
@@ -1700,7 +1699,6 @@ mod test {
|
|||||||
let pool_settings = PoolSettings {
|
let pool_settings = PoolSettings {
|
||||||
pool_mode: PoolMode::Transaction,
|
pool_mode: PoolMode::Transaction,
|
||||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||||
checkout_failure_limit: Some(10),
|
|
||||||
shards: 5,
|
shards: 5,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM rust:1.81.0-slim-bookworm
|
FROM rust:bullseye
|
||||||
|
|
||||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||||
RUN /bin/yj -h
|
RUN /bin/yj -h
|
||||||
|
|||||||
@@ -188,102 +188,6 @@ describe "Miscellaneous" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Checkout failure limit" do
|
|
||||||
context "when no checkout failure limit is set" do
|
|
||||||
before do
|
|
||||||
new_configs = processes.pgcat.current_config
|
|
||||||
new_configs["general"]["connect_timeout"] = 200
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
||||||
processes.pgcat.update_config(new_configs)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
sleep 0.5
|
|
||||||
end
|
|
||||||
|
|
||||||
it "does not disconnect client" do
|
|
||||||
Array.new(5) do
|
|
||||||
Thread.new do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
for i in 0..4
|
|
||||||
begin
|
|
||||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
rescue PG::SystemError
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
end.each(&:join)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "when checkout failure limit is set high" do
|
|
||||||
before do
|
|
||||||
new_configs = processes.pgcat.current_config
|
|
||||||
new_configs["general"]["connect_timeout"] = 200
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
||||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
|
|
||||||
processes.pgcat.update_config(new_configs)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
sleep 0.5
|
|
||||||
end
|
|
||||||
|
|
||||||
it "does not disconnect client" do
|
|
||||||
Array.new(5) do
|
|
||||||
Thread.new do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
for i in 0..4
|
|
||||||
begin
|
|
||||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
rescue PG::SystemError
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
end.each(&:join)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "when checkout failure limit is set low" do
|
|
||||||
before do
|
|
||||||
new_configs = processes.pgcat.current_config
|
|
||||||
new_configs["general"]["connect_timeout"] = 200
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
||||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
|
|
||||||
processes.pgcat.update_config(new_configs)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
sleep 0.5
|
|
||||||
end
|
|
||||||
|
|
||||||
it "disconnects client after reaching limit" do
|
|
||||||
Array.new(5) do
|
|
||||||
Thread.new do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
checkout_failure_count = 0
|
|
||||||
for i in 0..4
|
|
||||||
begin
|
|
||||||
conn.async_exec("SELECT pg_sleep(1);")
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
rescue PG::SystemError
|
|
||||||
checkout_failure_count += 1
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
|
||||||
rescue PG::ConnectionBad
|
|
||||||
expect(checkout_failure_count).to eq(2)
|
|
||||||
expect(conn.status).to eq(PG::CONNECTION_BAD)
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
end.each(&:join)
|
|
||||||
puts processes.pgcat.logs
|
|
||||||
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Server version reporting" do
|
describe "Server version reporting" do
|
||||||
it "reports correct version for normal and admin databases" do
|
it "reports correct version for normal and admin databases" do
|
||||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|||||||
Reference in New Issue
Block a user