From 5948fef6cf97bc9ca7c720f89f63793a22705f4f Mon Sep 17 00:00:00 2001 From: zainkabani <77307340+zainkabani@users.noreply.github.com> Date: Thu, 18 Aug 2022 08:12:38 -0400 Subject: [PATCH] Minor Refactoring of re-used code and server stat reporting (#129) * Minor changes to stats reporting and recduce re-used code * fmt --- src/client.rs | 112 +++++++++++++++++++++----------------------------- src/pool.rs | 4 +- 2 files changed, 49 insertions(+), 67 deletions(-) diff --git a/src/client.rs b/src/client.rs index b36eae0..278cda8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -667,7 +667,6 @@ where .client_disconnecting(self.process_id, last_address_id); } self.stats.client_active(self.process_id, address.id); - self.stats.server_active(server.process_id(), address.id); self.last_address_id = Some(address.id); self.last_server_id = Some(server.process_id()); @@ -731,44 +730,16 @@ where 'Q' => { debug!("Sending query to server"); - self.send_server_message( - server, + self.send_and_receive_loop( + code, original, + server, &address, query_router.shard(), &pool, ) .await?; - // Read all data the server has to offer, which can be multiple messages - // buffered in 8196 bytes chunks. - loop { - let response = self - .receive_server_message( - server, - &address, - query_router.shard(), - &pool, - ) - .await?; - - // Send server reply to the client. - match write_all_half(&mut self.write, response).await { - Ok(_) => (), - Err(err) => { - server.mark_bad(); - return Err(err); - } - }; - - if !server.is_data_available() { - break; - } - } - - // Report query executed statistics. - self.stats.query(self.process_id, address.id); - if !server.in_transaction() { // Report transaction executed statistics. self.stats.transaction(self.process_id, address.id); @@ -776,7 +747,6 @@ where // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { - self.stats.server_idle(server.process_id(), address.id); break; } } @@ -830,9 +800,10 @@ where self.buffer.put(&original[..]); - self.send_server_message( - server, + self.send_and_receive_loop( + code, self.buffer.clone(), + server, &address, query_router.shard(), &pool, @@ -841,41 +812,12 @@ where self.buffer.clear(); - // Read all data the server has to offer, which can be multiple messages - // buffered in 8196 bytes chunks. - loop { - let response = self - .receive_server_message( - server, - &address, - query_router.shard(), - &pool, - ) - .await?; - - match write_all_half(&mut self.write, response).await { - Ok(_) => (), - Err(err) => { - server.mark_bad(); - return Err(err); - } - }; - - if !server.is_data_available() { - break; - } - } - - // Report query executed statistics. - self.stats.query(self.process_id, address.id); - if !server.in_transaction() { self.stats.transaction(self.process_id, address.id); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { - self.stats.server_idle(server.process_id(), address.id); break; } } @@ -925,7 +867,6 @@ where // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { - self.stats.server_idle(server.process_id(), address.id); break; } } @@ -941,6 +882,7 @@ where // The server is no longer bound to us, we can't cancel it's queries anymore. debug!("Releasing server back into the pool"); + self.stats.server_idle(server.process_id(), address.id); self.release(); self.stats.client_idle(self.process_id, address.id); } @@ -952,6 +894,46 @@ where guard.remove(&(self.process_id, self.secret_key)); } + async fn send_and_receive_loop( + &mut self, + code: char, + message: BytesMut, + server: &mut Server, + address: &Address, + shard: usize, + pool: &ConnectionPool, + ) -> Result<(), Error> { + debug!("Sending {} to server", code); + + self.send_server_message(server, message, &address, shard, &pool) + .await?; + + // Read all data the server has to offer, which can be multiple messages + // buffered in 8196 bytes chunks. + loop { + let response = self + .receive_server_message(server, &address, shard, &pool) + .await?; + + match write_all_half(&mut self.write, response).await { + Ok(_) => (), + Err(err) => { + server.mark_bad(); + return Err(err); + } + }; + + if !server.is_data_available() { + break; + } + } + + // Report query executed statistics. + self.stats.query(self.process_id, address.id); + + Ok(()) + } + async fn send_server_message( &self, server: &mut Server, diff --git a/src/pool.rs b/src/pool.rs index 4a70078..5684d54 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -335,7 +335,7 @@ impl ConnectionPool { if !require_healthcheck { self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_idle(conn.process_id(), address.id); + self.stats.server_active(conn.process_id(), address.id); return Ok((conn, address.clone())); } @@ -354,7 +354,7 @@ impl ConnectionPool { Ok(_) => { self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_idle(conn.process_id(), address.id); + self.stats.server_active(conn.process_id(), address.id); return Ok((conn, address.clone())); }