Add idle client in transaction configuration (#380)

* Add idle client in transaction configuration

* fmt

* Update docs

* trigger build

* Add tests

* Make the config dynamic from reloads

* fmt

* comments

* trigger build

* fix config.md

* remove error
This commit is contained in:
Zain Kabani
2023-03-24 11:20:30 -04:00
committed by GitHub
parent d66b377a8e
commit ca4431b67e
5 changed files with 111 additions and 4 deletions

View File

@@ -49,6 +49,14 @@ default: 30000 # milliseconds
How long an idle connection with a server is left open (ms). How long an idle connection with a server is left open (ms).
### idle_client_in_transaction_timeout
```
path: general.idle_client_in_transaction_timeout
default: 0 # milliseconds
```
How long a client is allowed to be idle while in a transaction (ms).
### healthcheck_timeout ### healthcheck_timeout
``` ```
path: general.healthcheck_timeout path: general.healthcheck_timeout

View File

@@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds
# How long an idle connection with a server is left open (ms). # How long an idle connection with a server is left open (ms).
idle_timeout = 30000 # milliseconds idle_timeout = 30000 # milliseconds
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout = 0 # milliseconds
# How much time to give the health check query to return with a result (ms). # How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000 # milliseconds healthcheck_timeout = 1000 # milliseconds

View File

@@ -12,7 +12,7 @@ use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_info_for_admin, handle_admin}; use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::config::{get_config, Address, PoolMode}; use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*; use crate::constants::*;
use crate::messages::*; use crate::messages::*;
@@ -859,6 +859,11 @@ where
let mut initial_message = Some(message); let mut initial_message = Some(message);
let idle_client_timeout_duration = match get_idle_client_in_transaction_timeout() {
0 => tokio::time::Duration::MAX,
timeout => tokio::time::Duration::from_millis(timeout),
};
// Transaction loop. Multiple queries can be issued by the client here. // Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over, // The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode. // or until the client disconnects if we are in session mode.
@@ -870,15 +875,26 @@ where
None => { None => {
trace!("Waiting for message inside transaction or in session mode"); trace!("Waiting for message inside transaction or in session mode");
match read_message(&mut self.read).await { match tokio::time::timeout(
Ok(message) => message, idle_client_timeout_duration,
Err(err) => { read_message(&mut self.read),
)
.await
{
Ok(Ok(message)) => message,
Ok(Err(err)) => {
// Client disconnected inside a transaction. // Client disconnected inside a transaction.
// Clean up the server and re-use it. // Clean up the server and re-use it.
server.checkin_cleanup().await?; server.checkin_cleanup().await?;
return Err(err); return Err(err);
} }
Err(_) => {
// Client idle in transaction timeout
error_response(&mut self.write, "idle transaction timeout").await?;
error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role());
break;
}
} }
} }
Some(message) => { Some(message) => {

View File

@@ -197,6 +197,9 @@ pub struct General {
#[serde(default = "General::default_ban_time")] #[serde(default = "General::default_ban_time")]
pub ban_time: i64, pub ban_time: i64,
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
pub idle_client_in_transaction_timeout: u64,
#[serde(default = "General::default_worker_threads")] #[serde(default = "General::default_worker_threads")]
pub worker_threads: usize, pub worker_threads: usize,
@@ -260,6 +263,10 @@ impl General {
pub fn default_worker_threads() -> usize { pub fn default_worker_threads() -> usize {
4 4
} }
pub fn default_idle_client_in_transaction_timeout() -> u64 {
0
}
} }
impl Default for General { impl Default for General {
@@ -276,6 +283,7 @@ impl Default for General {
healthcheck_delay: Self::default_healthcheck_delay(), healthcheck_delay: Self::default_healthcheck_delay(),
ban_time: Self::default_ban_time(), ban_time: Self::default_ban_time(),
worker_threads: Self::default_worker_threads(), worker_threads: Self::default_worker_threads(),
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(), tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
tcp_keepalives_count: Self::default_tcp_keepalives_count(), tcp_keepalives_count: Self::default_tcp_keepalives_count(),
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(), tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
@@ -655,6 +663,13 @@ impl From<&Config> for std::collections::HashMap<String, String> {
config.general.healthcheck_delay.to_string(), config.general.healthcheck_delay.to_string(),
), ),
("ban_time".to_string(), config.general.ban_time.to_string()), ("ban_time".to_string(), config.general.ban_time.to_string()),
(
"idle_client_in_transaction_timeout".to_string(),
config
.general
.idle_client_in_transaction_timeout
.to_string(),
),
]; ];
r.append(&mut static_settings); r.append(&mut static_settings);
@@ -666,6 +681,10 @@ impl Config {
/// Print current configuration. /// Print current configuration.
pub fn show(&self) { pub fn show(&self) {
info!("Ban time: {}s", self.general.ban_time); info!("Ban time: {}s", self.general.ban_time);
info!(
"Idle client in transaction timeout: {}ms",
self.general.idle_client_in_transaction_timeout
);
info!("Worker threads: {}", self.general.worker_threads); info!("Worker threads: {}", self.general.worker_threads);
info!( info!(
"Healthcheck timeout: {}ms", "Healthcheck timeout: {}ms",
@@ -819,6 +838,12 @@ pub fn get_config() -> Config {
(*(*CONFIG.load())).clone() (*(*CONFIG.load())).clone()
} }
pub fn get_idle_client_in_transaction_timeout() -> u64 {
(*(*CONFIG.load()))
.general
.idle_client_in_transaction_timeout
}
/// Parse the configuration file located at the path. /// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> { pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new(); let mut contents = String::new();
@@ -889,6 +914,7 @@ mod test {
assert_eq!(get_config().path, "pgcat.toml".to_string()); assert_eq!(get_config().path, "pgcat.toml".to_string());
assert_eq!(get_config().general.ban_time, 60); assert_eq!(get_config().general.ban_time, 60);
assert_eq!(get_config().general.idle_client_in_transaction_timeout, 0);
assert_eq!(get_config().general.idle_timeout, 30000); assert_eq!(get_config().general.idle_timeout, 30000);
assert_eq!(get_config().pools.len(), 2); assert_eq!(get_config().pools.len(), 2);
assert_eq!(get_config().pools["sharded_db"].shards.len(), 3); assert_eq!(get_config().pools["sharded_db"].shards.len(), 3);

View File

@@ -309,4 +309,58 @@ describe "Miscellaneous" do
end end
end end
end end
describe "Idle client timeout" do
context "idle transaction timeout set to 0" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
puts(current_configs["general"]["idle_client_in_transaction_timeout"])
current_configs["general"]["idle_client_in_transaction_timeout"] = 0
processes.pgcat.update_config(current_configs) # with timeout 0
processes.pgcat.reload_config
end
it "Allow client to be idle in transaction" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(2)
conn.async_exec("COMMIT")
conn.close
end
end
context "idle transaction timeout set to 500ms" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
current_configs["general"]["idle_client_in_transaction_timeout"] = 500
processes.pgcat.update_config(current_configs) # with timeout 500
processes.pgcat.reload_config
end
it "Allow client to be idle in transaction below timeout" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(0.4) # below 500ms
conn.async_exec("COMMIT")
conn.close
end
it "Error when client idle in transaction time exceeds timeout" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(1) # above 500ms
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
conn.async_exec("SELECT 1") # should be able to send another query
conn.close
end
end
end
end end