Compare commits

...

3 Commits

Author SHA1 Message Date
Lev Kokotov
d6a13d047d Drop in-transaction connections for now 2022-08-18 13:31:15 -07:00
zainkabani
5948fef6cf Minor Refactoring of re-used code and server stat reporting (#129)
* Minor changes to stats reporting and recduce re-used code

* fmt
2022-08-18 05:12:38 -07:00
Mostafa Abdelraouf
790898c20e Add pool name and username to address object (#128)
* Add pool name and username to address object

* Fix address name

* fmt
2022-08-17 08:40:47 -07:00
3 changed files with 65 additions and 73 deletions

View File

@@ -667,7 +667,6 @@ where
.client_disconnecting(self.process_id, last_address_id);
}
self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id());
@@ -731,44 +730,16 @@ where
'Q' => {
debug!("Sending query to server");
self.send_server_message(
server,
self.send_and_receive_loop(
code,
original,
server,
&address,
query_router.shard(),
&pool,
)
.await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id);
@@ -776,7 +747,6 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
}
}
@@ -789,9 +759,13 @@ where
// Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave.
if server.in_transaction() {
server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
// server.query("ROLLBACK").await?;
// server.query("DISCARD ALL").await?;
// server.set_name("pgcat").await?;
// TODO: Figure out a clever way to ensure
// the server has no more messages for us.
server.mark_bad();
}
self.release();
@@ -830,9 +804,10 @@ where
self.buffer.put(&original[..]);
self.send_server_message(
server,
self.send_and_receive_loop(
code,
self.buffer.clone(),
server,
&address,
query_router.shard(),
&pool,
@@ -841,41 +816,12 @@ where
self.buffer.clear();
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
}
}
@@ -925,7 +871,6 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
}
}
@@ -941,6 +886,7 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
self.stats.server_idle(server.process_id(), address.id);
self.release();
self.stats.client_idle(self.process_id, address.id);
}
@@ -952,6 +898,46 @@ where
guard.remove(&(self.process_id, self.secret_key));
}
async fn send_and_receive_loop(
&mut self,
code: char,
message: BytesMut,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
debug!("Sending {} to server", code);
self.send_server_message(server, message, &address, shard, &pool)
.await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(server, &address, shard, &pool)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
Ok(())
}
async fn send_server_message(
&self,
server: &mut Server,

View File

@@ -64,6 +64,8 @@ pub struct Address {
pub database: String,
pub role: Role,
pub replica_number: usize,
pub username: String,
pub poolname: String,
}
impl Default for Address {
@@ -76,6 +78,8 @@ impl Default for Address {
replica_number: 0,
database: String::from("database"),
role: Role::Replica,
username: String::from("username"),
poolname: String::from("poolname"),
}
}
}
@@ -84,11 +88,11 @@ impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String {
match self.role {
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard),
Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
Role::Replica => format!(
"{}_shard_{}_replica_{}",
self.database, self.shard, self.replica_number
self.poolname, self.shard, self.replica_number
),
}
}

View File

@@ -114,12 +114,14 @@ impl ConnectionPool {
let address = Address {
id: address_id,
database: pool_name.clone(),
database: shard.database.clone(),
host: server.0.clone(),
port: server.1.to_string(),
role: role,
replica_number,
shard: shard_idx.parse::<usize>().unwrap(),
username: user_info.username.clone(),
poolname: pool_name.clone(),
};
address_id += 1;
@@ -333,7 +335,7 @@ impl ConnectionPool {
if !require_healthcheck {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}
@@ -352,7 +354,7 @@ impl ConnectionPool {
Ok(_) => {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}