From 993719333294b0e5066279f47a8d8bb962604cee Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Tue, 29 Aug 2023 10:07:36 -0700 Subject: [PATCH] Allow pause/resuming all pools (#566) support pausing all pools --- src/admin.rs | 172 +++++++++++++++++++++++---------------- tests/ruby/admin_spec.rb | 24 ++++++ 2 files changed, 126 insertions(+), 70 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index f27b2a0..da92529 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -74,11 +74,11 @@ where } "PAUSE" => { trace!("PAUSE"); - pause(stream, query_parts[1]).await + pause(stream, query_parts).await } "RESUME" => { trace!("RESUME"); - resume(stream, query_parts[1]).await + resume(stream, query_parts).await } "SHUTDOWN" => { trace!("SHUTDOWN"); @@ -797,96 +797,128 @@ where } /// Pause a pool. It won't pass any more queries to the backends. -async fn pause(stream: &mut T, query: &str) -> Result<(), Error> +async fn pause(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect(); + let parts: Vec<&str> = match tokens.len() == 2 { + true => tokens[1].split(",").map(|part| part.trim()).collect(), + false => Vec::new(), + }; - if parts.len() != 2 { - error_response( - stream, - "PAUSE requires a database and a user, e.g. PAUSE my_db,my_user", - ) - .await - } else { - let database = parts[0]; - let user = parts[1]; - - match get_pool(database, user) { - Some(pool) => { + match parts.len() { + 0 => { + for (_, pool) in get_all_pools() { pool.pause(); - - let mut res = BytesMut::new(); - - res.put(command_complete(&format!("PAUSE {},{}", database, user))); - - // ReadyForQuery - res.put_u8(b'Z'); - res.put_i32(5); - res.put_u8(b'I'); - - write_all_half(stream, &res).await } - None => { - error_response( - stream, - &format!( - "No pool configured for database: {}, user: {}", - database, user - ), - ) - .await + let mut res = BytesMut::new(); + + res.put(command_complete("PAUSE")); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, &res).await + } + 2 => { + let database = parts[0]; + let user = parts[1]; + + match get_pool(database, user) { + Some(pool) => { + pool.pause(); + + let mut res = BytesMut::new(); + + res.put(command_complete(&format!("PAUSE {},{}", database, user))); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, &res).await + } + + None => { + error_response( + stream, + &format!( + "No pool configured for database: {}, user: {}", + database, user + ), + ) + .await + } } } + _ => error_response(stream, "usage: PAUSE [db, user]").await, } } /// Resume a pool. Queries are allowed again. -async fn resume(stream: &mut T, query: &str) -> Result<(), Error> +async fn resume(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect(); + let parts: Vec<&str> = match tokens.len() == 2 { + true => tokens[1].split(",").map(|part| part.trim()).collect(), + false => Vec::new(), + }; - if parts.len() != 2 { - error_response( - stream, - "RESUME requires a database and a user, e.g. RESUME my_db,my_user", - ) - .await - } else { - let database = parts[0]; - let user = parts[1]; - - match get_pool(database, user) { - Some(pool) => { + match parts.len() { + 0 => { + for (_, pool) in get_all_pools() { pool.resume(); - - let mut res = BytesMut::new(); - - res.put(command_complete(&format!("RESUME {},{}", database, user))); - - // ReadyForQuery - res.put_u8(b'Z'); - res.put_i32(5); - res.put_u8(b'I'); - - write_all_half(stream, &res).await } - None => { - error_response( - stream, - &format!( - "No pool configured for database: {}, user: {}", - database, user - ), - ) - .await + let mut res = BytesMut::new(); + + res.put(command_complete("RESUME")); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, &res).await + } + 2 => { + let database = parts[0]; + let user = parts[1]; + + match get_pool(database, user) { + Some(pool) => { + pool.resume(); + + let mut res = BytesMut::new(); + + res.put(command_complete(&format!("RESUME {},{}", database, user))); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, &res).await + } + + None => { + error_response( + stream, + &format!( + "No pool configured for database: {}, user: {}", + database, user + ), + ) + .await + } } } + _ => error_response(stream, "usage: RESUME [db, user]").await, } } diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index 9a85235..abaa5ff 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -90,4 +90,28 @@ describe "Admin" do expect(results["pool_mode"]).to eq("transaction") end end + + describe "PAUSE" do + it "pauses all pools" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW DATABASES").to_a + expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"]) + + admin_conn.async_exec("PAUSE") + + results = admin_conn.async_exec("SHOW DATABASES").to_a + expect(results.map{ |r| r["paused"] }.uniq).to eq(["1"]) + + admin_conn.async_exec("RESUME") + + results = admin_conn.async_exec("SHOW DATABASES").to_a + expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"]) + end + + it "handles errors" do + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + expect { admin_conn.async_exec("PAUSE foo").to_a }.to raise_error(PG::SystemError) + expect { admin_conn.async_exec("PAUSE foo,bar").to_a }.to raise_error(PG::SystemError) + end + end end