Improve logging for connection cleanup (#428)

* initial commit

* fix

* fmt
This commit is contained in:
Zain Kabani
2023-05-11 20:40:10 -04:00
committed by GitHub
parent 73260690b0
commit 0907f1b77f

View File

@@ -103,6 +103,48 @@ impl StreamInner {
} }
} }
#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement
needs_cleanup_set: bool,
/// If server connection requires DISCARD ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}
impl CleanupState {
fn new() -> Self {
CleanupState {
needs_cleanup_set: false,
needs_cleanup_prepare: false,
}
}
fn needs_cleanup(&self) -> bool {
self.needs_cleanup_set || self.needs_cleanup_prepare
}
fn set_true(&mut self) {
self.needs_cleanup_set = true;
self.needs_cleanup_prepare = true;
}
fn reset(&mut self) {
self.needs_cleanup_set = false;
self.needs_cleanup_prepare = false;
}
}
impl std::fmt::Display for CleanupState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SET: {}, PREPARE: {}",
self.needs_cleanup_set, self.needs_cleanup_prepare
)
}
}
/// Server state. /// Server state.
pub struct Server { pub struct Server {
/// Server host, e.g. localhost, /// Server host, e.g. localhost,
@@ -131,8 +173,8 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so. /// Is the server broken? We'll remote it from the pool if so.
bad: bool, bad: bool,
/// If server connection requires a DISCARD ALL before checkin /// If server connection requires DISCARD ALL before checkin
needs_cleanup: bool, cleanup_state: CleanupState,
/// Mapping of clients and servers used for query cancellation. /// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
@@ -630,7 +672,7 @@ impl Server {
in_transaction: false, in_transaction: false,
data_available: false, data_available: false,
bad: false, bad: false,
needs_cleanup: false, cleanup_state: CleanupState::new(),
client_server_map, client_server_map,
addr_set, addr_set,
connected_at: chrono::offset::Utc::now().naive_utc(), connected_at: chrono::offset::Utc::now().naive_utc(),
@@ -793,12 +835,12 @@ impl Server {
// This will reduce amount of discard statements sent // This will reduce amount of discard statements sent
if !self.in_transaction { if !self.in_transaction {
debug!("Server connection marked for clean up"); debug!("Server connection marked for clean up");
self.needs_cleanup = true; self.cleanup_state.needs_cleanup_set = true;
} }
} }
"PREPARE\0" => { "PREPARE\0" => {
debug!("Server connection marked for clean up"); debug!("Server connection marked for clean up");
self.needs_cleanup = true; self.cleanup_state.needs_cleanup_prepare = true;
} }
_ => (), _ => (),
} }
@@ -960,11 +1002,11 @@ impl Server {
// to avoid leaking state between clients. For performance reasons we only // to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending // send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin. // it before each checkin.
if self.needs_cleanup { if self.cleanup_state.needs_cleanup() {
warn!("Server returned with session state altered, discarding state"); warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?; self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?; self.query("RESET ROLE").await?;
self.needs_cleanup = false; self.cleanup_state.reset();
} }
Ok(()) Ok(())
@@ -976,12 +1018,12 @@ impl Server {
self.application_name = name.to_string(); self.application_name = name.to_string();
// We don't want `SET application_name` to mark the server connection // We don't want `SET application_name` to mark the server connection
// as needing cleanup // as needing cleanup
let needs_cleanup_before = self.needs_cleanup; let needs_cleanup_before = self.cleanup_state;
let result = Ok(self let result = Ok(self
.query(&format!("SET application_name = '{}'", name)) .query(&format!("SET application_name = '{}'", name))
.await?); .await?);
self.needs_cleanup = needs_cleanup_before; self.cleanup_state = needs_cleanup_before;
result result
} else { } else {
Ok(()) Ok(())
@@ -1006,7 +1048,7 @@ impl Server {
// Marks a connection as needing DISCARD ALL at checkin // Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty(&mut self) { pub fn mark_dirty(&mut self) {
self.needs_cleanup = true; self.cleanup_state.set_true();
} }
pub fn mirror_send(&mut self, bytes: &BytesMut) { pub fn mirror_send(&mut self, bytes: &BytesMut) {