Compare commits

..

3 Commits

Author SHA1 Message Date
Lev Kokotov
d6a13d047d Drop in-transaction connections for now 2022-08-18 13:31:15 -07:00
zainkabani
5948fef6cf Minor Refactoring of re-used code and server stat reporting (#129)
* Minor changes to stats reporting and recduce re-used code

* fmt
2022-08-18 05:12:38 -07:00
Mostafa Abdelraouf
790898c20e Add pool name and username to address object (#128)
* Add pool name and username to address object

* Fix address name

* fmt
2022-08-17 08:40:47 -07:00
4 changed files with 111 additions and 151 deletions

View File

@@ -667,7 +667,6 @@ where
.client_disconnecting(self.process_id, last_address_id); .client_disconnecting(self.process_id, last_address_id);
} }
self.stats.client_active(self.process_id, address.id); self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id); self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id()); self.last_server_id = Some(server.process_id());
@@ -731,44 +730,16 @@ where
'Q' => { 'Q' => {
debug!("Sending query to server"); debug!("Sending query to server");
self.send_server_message( self.send_and_receive_loop(
server, code,
original, original,
server,
&address, &address,
query_router.shard(), query_router.shard(),
&pool, &pool,
) )
.await?; .await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
@@ -776,7 +747,6 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -789,9 +759,13 @@ where
// Pgbouncer closes the connection which leads to // Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave. // connection thrashing when clients misbehave.
if server.in_transaction() { if server.in_transaction() {
server.query("ROLLBACK").await?; // server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?; // server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?; // server.set_name("pgcat").await?;
// TODO: Figure out a clever way to ensure
// the server has no more messages for us.
server.mark_bad();
} }
self.release(); self.release();
@@ -830,9 +804,10 @@ where
self.buffer.put(&original[..]); self.buffer.put(&original[..]);
self.send_server_message( self.send_and_receive_loop(
server, code,
self.buffer.clone(), self.buffer.clone(),
server,
&address, &address,
query_router.shard(), query_router.shard(),
&pool, &pool,
@@ -841,41 +816,12 @@ where
self.buffer.clear(); self.buffer.clear();
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -925,7 +871,6 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -941,6 +886,7 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore. // The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool"); debug!("Releasing server back into the pool");
self.stats.server_idle(server.process_id(), address.id);
self.release(); self.release();
self.stats.client_idle(self.process_id, address.id); self.stats.client_idle(self.process_id, address.id);
} }
@@ -952,6 +898,46 @@ where
guard.remove(&(self.process_id, self.secret_key)); guard.remove(&(self.process_id, self.secret_key));
} }
async fn send_and_receive_loop(
&mut self,
code: char,
message: BytesMut,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
debug!("Sending {} to server", code);
self.send_server_message(server, message, &address, shard, &pool)
.await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(server, &address, shard, &pool)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
Ok(())
}
async fn send_server_message( async fn send_server_message(
&self, &self,
server: &mut Server, server: &mut Server,

View File

@@ -64,6 +64,8 @@ pub struct Address {
pub database: String, pub database: String,
pub role: Role, pub role: Role,
pub replica_number: usize, pub replica_number: usize,
pub username: String,
pub poolname: String,
} }
impl Default for Address { impl Default for Address {
@@ -76,6 +78,8 @@ impl Default for Address {
replica_number: 0, replica_number: 0,
database: String::from("database"), database: String::from("database"),
role: Role::Replica, role: Role::Replica,
username: String::from("username"),
poolname: String::from("poolname"),
} }
} }
} }
@@ -84,11 +88,11 @@ impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`. /// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String { pub fn name(&self) -> String {
match self.role { match self.role {
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard), Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
Role::Replica => format!( Role::Replica => format!(
"{}_shard_{}_replica_{}", "{}_shard_{}_replica_{}",
self.database, self.shard, self.replica_number self.poolname, self.shard, self.replica_number
), ),
} }
} }

View File

@@ -114,12 +114,14 @@ impl ConnectionPool {
let address = Address { let address = Address {
id: address_id, id: address_id,
database: pool_name.clone(), database: shard.database.clone(),
host: server.0.clone(), host: server.0.clone(),
port: server.1.to_string(), port: server.1.to_string(),
role: role, role: role,
replica_number, replica_number,
shard: shard_idx.parse::<usize>().unwrap(), shard: shard_idx.parse::<usize>().unwrap(),
username: user_info.username.clone(),
poolname: pool_name.clone(),
}; };
address_id += 1; address_id += 1;
@@ -333,7 +335,7 @@ impl ConnectionPool {
if !require_healthcheck { if !require_healthcheck {
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
@@ -352,7 +354,7 @@ impl ConnectionPool {
Ok(_) => { Ok(_) => {
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }

View File

@@ -4,7 +4,6 @@ use log::{error, info, trace};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -43,26 +42,6 @@ enum EventName {
UpdateAverages, UpdateAverages,
} }
/// Send an event via the channel and log
/// an error if it fails.
fn send(tx: &Sender<Event>, event: Event) {
let name = event.name;
let result = tx.try_send(event);
match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
name,
tx.capacity()
),
Err(err) => match err {
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
},
};
}
/// Event data sent to the collector /// Event data sent to the collector
/// from clients and servers. /// from clients and servers.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -101,6 +80,25 @@ impl Reporter {
Reporter { tx: tx } Reporter { tx: tx }
} }
/// Send statistics to the task keeping track of stats.
fn send(&self, event: Event) {
let name = event.name;
let result = self.tx.try_send(event);
match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
name,
self.tx.capacity()
),
Err(err) => match err {
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
},
};
}
/// Report a query executed by a client against /// Report a query executed by a client against
/// a server identified by the `address_id`. /// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) { pub fn query(&self, process_id: i32, address_id: usize) {
@@ -111,7 +109,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event); self.send(event);
} }
/// Report a transaction executed by a client against /// Report a transaction executed by a client against
@@ -124,7 +122,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Report data sent to a server identified by `address_id`. /// Report data sent to a server identified by `address_id`.
@@ -137,7 +135,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Report data received from a server identified by `address_id`. /// Report data received from a server identified by `address_id`.
@@ -150,7 +148,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Time spent waiting to get a healthy connection from the pool /// Time spent waiting to get a healthy connection from the pool
@@ -164,7 +162,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a client identified by `process_id` waiting for a connection /// Reports a client identified by `process_id` waiting for a connection
@@ -177,7 +175,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a client identified by `process_id` is done waiting for a connection /// Reports a client identified by `process_id` is done waiting for a connection
@@ -190,7 +188,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a client identified by `process_id` is done querying the server /// Reports a client identified by `process_id` is done querying the server
@@ -203,7 +201,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a client identified by `process_id` is disconecting from the pooler. /// Reports a client identified by `process_id` is disconecting from the pooler.
@@ -216,7 +214,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -230,7 +228,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -244,7 +242,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -258,7 +256,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -272,7 +270,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
/// Reports a server connection identified by `process_id` is disconecting from the pooler. /// Reports a server connection identified by `process_id` is disconecting from the pooler.
@@ -285,7 +283,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
send(&self.tx, event) self.send(event)
} }
} }
@@ -347,9 +345,6 @@ impl Collector {
// Track which state the client and server are at any given time. // Track which state the client and server are at any given time.
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new(); let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
// Average update times
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
// Flush stats to StatsD and calculate averages every 15 seconds. // Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone(); let tx = self.tx.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
@@ -359,15 +354,12 @@ impl Collector {
interval.tick().await; interval.tick().await;
let address_count = get_number_of_addresses(); let address_count = get_number_of_addresses();
for address_id in 0..address_count { for address_id in 0..address_count {
send( let _ = tx.try_send(Event {
&tx, name: EventName::UpdateStats,
Event { value: 0,
name: EventName::UpdateStats, process_id: -1,
value: 0, address_id: address_id,
process_id: -1, });
address_id: address_id,
},
);
} }
} }
}); });
@@ -380,15 +372,12 @@ impl Collector {
interval.tick().await; interval.tick().await;
let address_count = get_number_of_addresses(); let address_count = get_number_of_addresses();
for address_id in 0..address_count { for address_id in 0..address_count {
send( let _ = tx.try_send(Event {
&tx, name: EventName::UpdateAverages,
Event { value: 0,
name: EventName::UpdateAverages, process_id: -1,
value: 0, address_id: address_id,
process_id: -1, });
address_id: address_id,
},
);
} }
} }
}); });
@@ -410,9 +399,6 @@ impl Collector {
.entry(stat.address_id) .entry(stat.address_id)
.or_insert(HashMap::new()); .or_insert(HashMap::new());
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new()); let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
let last_updated_avg = last_updated_avg
.entry(stat.address_id)
.or_insert(SystemTime::now());
// Some are counters, some are gauges... // Some are counters, some are gauges...
match stat.name { match stat.name {
@@ -538,24 +524,6 @@ impl Collector {
} }
EventName::UpdateAverages => { EventName::UpdateAverages => {
let elapsed = match last_updated_avg.elapsed() {
Ok(elapsed) => elapsed.as_secs(),
Err(err) => {
error!(
"Could not get elapsed time, averages may be incorrect: {:?}",
err
);
STAT_PERIOD / 1_000
}
} as i64;
*last_updated_avg = SystemTime::now();
// Tokio triggers the interval on first tick and then sleeps.
if elapsed == 0 {
continue;
}
// Calculate averages // Calculate averages
for stat in &[ for stat in &[
"avg_query_count", "avg_query_count",
@@ -573,7 +541,7 @@ impl Collector {
let old_value = old_stats.entry(total_name.clone()).or_insert(0); let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / elapsed; // Avg / second let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg); stats.insert(stat, avg);
*old_value = new_value; *old_value = new_value;