diff --git a/CONFIG.md b/CONFIG.md index 188a2b1..bcd6f09 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -49,6 +49,14 @@ default: 30000 # milliseconds How long an idle connection with a server is left open (ms). +### idle_client_in_transaction_timeout +``` +path: general.idle_client_in_transaction_timeout +default: 0 # milliseconds +``` + +How long a client is allowed to be idle while in a transaction (ms). + ### healthcheck_timeout ``` path: general.healthcheck_timeout diff --git a/pgcat.toml b/pgcat.toml index 7903cdf..0d883a3 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds # How long an idle connection with a server is left open (ms). idle_timeout = 30000 # milliseconds +# How long a client is allowed to be idle while in a transaction (ms). +idle_client_in_transaction_timeout = 0 # milliseconds + # How much time to give the health check query to return with a result (ms). healthcheck_timeout = 1000 # milliseconds diff --git a/src/client.rs b/src/client.rs index bb70a65..68f6f57 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,7 +12,7 @@ use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::Sender; use crate::admin::{generate_server_info_for_admin, handle_admin}; -use crate::config::{get_config, Address, PoolMode}; +use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode}; use crate::constants::*; use crate::messages::*; @@ -859,6 +859,11 @@ where let mut initial_message = Some(message); + let idle_client_timeout_duration = match get_idle_client_in_transaction_timeout() { + 0 => tokio::time::Duration::MAX, + timeout => tokio::time::Duration::from_millis(timeout), + }; + // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. @@ -870,15 +875,26 @@ where None => { trace!("Waiting for message inside transaction or in session mode"); - match read_message(&mut self.read).await { - Ok(message) => message, - Err(err) => { + match tokio::time::timeout( + idle_client_timeout_duration, + read_message(&mut self.read), + ) + .await + { + Ok(Ok(message)) => message, + Ok(Err(err)) => { // Client disconnected inside a transaction. // Clean up the server and re-use it. server.checkin_cleanup().await?; return Err(err); } + Err(_) => { + // Client idle in transaction timeout + error_response(&mut self.write, "idle transaction timeout").await?; + error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role()); + break; + } } } Some(message) => { diff --git a/src/config.rs b/src/config.rs index 5adf8d5..a3182e2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -197,6 +197,9 @@ pub struct General { #[serde(default = "General::default_ban_time")] pub ban_time: i64, + #[serde(default = "General::default_idle_client_in_transaction_timeout")] + pub idle_client_in_transaction_timeout: u64, + #[serde(default = "General::default_worker_threads")] pub worker_threads: usize, @@ -260,6 +263,10 @@ impl General { pub fn default_worker_threads() -> usize { 4 } + + pub fn default_idle_client_in_transaction_timeout() -> u64 { + 0 + } } impl Default for General { @@ -276,6 +283,7 @@ impl Default for General { healthcheck_delay: Self::default_healthcheck_delay(), ban_time: Self::default_ban_time(), worker_threads: Self::default_worker_threads(), + idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(), tcp_keepalives_idle: Self::default_tcp_keepalives_idle(), tcp_keepalives_count: Self::default_tcp_keepalives_count(), tcp_keepalives_interval: Self::default_tcp_keepalives_interval(), @@ -655,6 +663,13 @@ impl From<&Config> for std::collections::HashMap { config.general.healthcheck_delay.to_string(), ), ("ban_time".to_string(), config.general.ban_time.to_string()), + ( + "idle_client_in_transaction_timeout".to_string(), + config + .general + .idle_client_in_transaction_timeout + .to_string(), + ), ]; r.append(&mut static_settings); @@ -666,6 +681,10 @@ impl Config { /// Print current configuration. pub fn show(&self) { info!("Ban time: {}s", self.general.ban_time); + info!( + "Idle client in transaction timeout: {}ms", + self.general.idle_client_in_transaction_timeout + ); info!("Worker threads: {}", self.general.worker_threads); info!( "Healthcheck timeout: {}ms", @@ -819,6 +838,12 @@ pub fn get_config() -> Config { (*(*CONFIG.load())).clone() } +pub fn get_idle_client_in_transaction_timeout() -> u64 { + (*(*CONFIG.load())) + .general + .idle_client_in_transaction_timeout +} + /// Parse the configuration file located at the path. pub async fn parse(path: &str) -> Result<(), Error> { let mut contents = String::new(); @@ -889,6 +914,7 @@ mod test { assert_eq!(get_config().path, "pgcat.toml".to_string()); assert_eq!(get_config().general.ban_time, 60); + assert_eq!(get_config().general.idle_client_in_transaction_timeout, 0); assert_eq!(get_config().general.idle_timeout, 30000); assert_eq!(get_config().pools.len(), 2); assert_eq!(get_config().pools["sharded_db"].shards.len(), 3); diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 2f69fb4..1a04c5d 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -309,4 +309,58 @@ describe "Miscellaneous" do end end end + + describe "Idle client timeout" do + context "idle transaction timeout set to 0" do + before do + current_configs = processes.pgcat.current_config + correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"] + puts(current_configs["general"]["idle_client_in_transaction_timeout"]) + + current_configs["general"]["idle_client_in_transaction_timeout"] = 0 + + processes.pgcat.update_config(current_configs) # with timeout 0 + processes.pgcat.reload_config + end + + it "Allow client to be idle in transaction" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("BEGIN") + conn.async_exec("SELECT 1") + sleep(2) + conn.async_exec("COMMIT") + conn.close + end + end + + context "idle transaction timeout set to 500ms" do + before do + current_configs = processes.pgcat.current_config + correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"] + current_configs["general"]["idle_client_in_transaction_timeout"] = 500 + + processes.pgcat.update_config(current_configs) # with timeout 500 + processes.pgcat.reload_config + end + + it "Allow client to be idle in transaction below timeout" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("BEGIN") + conn.async_exec("SELECT 1") + sleep(0.4) # below 500ms + conn.async_exec("COMMIT") + conn.close + end + + it "Error when client idle in transaction time exceeds timeout" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("BEGIN") + conn.async_exec("SELECT 1") + sleep(1) # above 500ms + expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/) + conn.async_exec("SELECT 1") # should be able to send another query + conn.close + end + end + end end