Reset instead of discard all (#549)

* Use reset all instead of discard all

* Move 'X' handling to before admin handle

* fix tests
This commit is contained in:
Zain Kabani
2023-08-16 13:08:48 -04:00
committed by GitHub
parent 4f0f45b576
commit bb27586758
3 changed files with 35 additions and 26 deletions

View File

@@ -821,6 +821,14 @@ where
message_result = read_message(&mut self.read) => message_result? message_result = read_message(&mut self.read) => message_result?
}; };
if message[0] as char == 'X' {
debug!("Client disconnecting");
self.stats.disconnect();
return Ok(());
}
// Handle admin database queries. // Handle admin database queries.
if self.admin { if self.admin {
debug!("Handling admin command"); debug!("Handling admin command");
@@ -940,14 +948,6 @@ where
continue; continue;
} }
'X' => {
debug!("Client disconnecting");
self.stats.disconnect();
return Ok(());
}
// Close (F) // Close (F)
'C' => { 'C' => {
if prepared_statements_enabled { if prepared_statements_enabled {

View File

@@ -107,10 +107,10 @@ impl StreamInner {
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
struct CleanupState { struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement /// If server connection requires RESET ALL before checkin because of set statement
needs_cleanup_set: bool, needs_cleanup_set: bool,
/// If server connection requires DISCARD ALL before checkin because of prepare statement /// If server connection requires DEALLOCATE ALL before checkin because of prepare statement
needs_cleanup_prepare: bool, needs_cleanup_prepare: bool,
} }
@@ -296,7 +296,7 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so. /// Is the server broken? We'll remote it from the pool if so.
bad: bool, bad: bool,
/// If server connection requires DISCARD ALL before checkin /// If server connection requires reset statements before checkin
cleanup_state: CleanupState, cleanup_state: CleanupState,
/// Mapping of clients and servers used for query cancellation. /// Mapping of clients and servers used for query cancellation.
@@ -982,7 +982,7 @@ impl Server {
// We don't detect set statements in transactions // We don't detect set statements in transactions
// No great way to differentiate between set and set local // No great way to differentiate between set and set local
// As a result, we will miss cases when set statements are used in transactions // As a result, we will miss cases when set statements are used in transactions
// This will reduce amount of discard statements sent // This will reduce amount of reset statements sent
if !self.in_transaction { if !self.in_transaction {
debug!("Server connection marked for clean up"); debug!("Server connection marked for clean up");
self.cleanup_state.needs_cleanup_set = true; self.cleanup_state.needs_cleanup_set = true;
@@ -1304,12 +1304,21 @@ impl Server {
// Client disconnected but it performed session-altering operations such as // Client disconnected but it performed session-altering operations such as
// SET statement_timeout to 1 or create a prepared statement. We clear that // SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only // to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending // send `RESET ALL` if we think the session is altered instead of just sending
// it before each checkin. // it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections { if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name); info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?; let mut reset_string = String::from("RESET ROLE;");
self.query("RESET ROLE").await?;
if self.cleanup_state.needs_cleanup_set {
reset_string.push_str("RESET ALL;");
};
if self.cleanup_state.needs_cleanup_prepare {
reset_string.push_str("DEALLOCATE ALL;");
};
self.query(&reset_string).await?;
self.cleanup_state.reset(); self.cleanup_state.reset();
} }
@@ -1336,7 +1345,7 @@ impl Server {
self.last_activity self.last_activity
} }
// Marks a connection as needing DISCARD ALL at checkin // Marks a connection as needing cleanup at checkin
pub fn mark_dirty(&mut self) { pub fn mark_dirty(&mut self) {
self.cleanup_state.set_true(); self.cleanup_state.set_true();
} }

View File

@@ -221,7 +221,7 @@ describe "Miscellaneous" do
conn.close conn.close
end end
it "Does not send DISCARD ALL unless necessary" do it "Does not send RESET ALL unless necessary" do
10.times do 10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'") conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -229,7 +229,7 @@ describe "Miscellaneous" do
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
10.times do 10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -239,7 +239,7 @@ describe "Miscellaneous" do
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(10) expect(processes.primary.count_query("RESET ALL")).to eq(10)
end end
it "Resets server roles correctly" do it "Resets server roles correctly" do
@@ -273,7 +273,7 @@ describe "Miscellaneous" do
end end
end end
it "Does not send DISCARD ALL unless necessary" do it "Does not send RESET ALL unless necessary" do
10.times do 10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'") conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -282,7 +282,7 @@ describe "Miscellaneous" do
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
10.times do 10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -292,7 +292,7 @@ describe "Miscellaneous" do
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(10) expect(processes.primary.count_query("RESET ALL")).to eq(10)
end end
it "Respects tracked parameters on startup" do it "Respects tracked parameters on startup" do
@@ -331,7 +331,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT") conn.async_exec("COMMIT")
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
10.times do 10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -341,7 +341,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT") conn.async_exec("COMMIT")
conn.close conn.close
end end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
end end
end end
@@ -355,7 +355,7 @@ describe "Miscellaneous" do
conn.close conn.close
puts processes.pgcat.logs puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
end end
it "will not clean up prepared statements" do it "will not clean up prepared statements" do
@@ -366,7 +366,7 @@ describe "Miscellaneous" do
conn.close conn.close
puts processes.pgcat.logs puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0) expect(processes.primary.count_query("RESET ALL")).to eq(0)
end end
end end
end end