diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 12d19a4..9dbca9d 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -13,6 +13,9 @@ function start_pgcat() { # Setup the database with shards and user psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i # Install Toxiproxy to simulate a downed/slow database wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb @@ -28,9 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" # pgbench test -pgbench -i -h 127.0.0.1 -p 6432 && \ - pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \ - pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended +pgbench -i -h 127.0.0.1 -p 6432 +pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql +pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended # COPY TO STDOUT test psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null diff --git a/Cargo.lock b/Cargo.lock index d6e42cb..7ee2138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,7 +371,6 @@ dependencies = [ "serde_derive", "sha-1", "sqlparser", - "statsd", "tokio", "toml", ] @@ -542,15 +541,6 @@ dependencies = [ "log", ] -[[package]] -name = "statsd" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330" -dependencies = [ - "rand", -] - [[package]] name = "syn" version = "1.0.86" diff --git a/Cargo.toml b/Cargo.toml index d070c61..74d4a76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ serde_derive = "1" regex = "1" num_cpus = "1" once_cell = "1" -statsd = "0.15" sqlparser = "0.14" log = "0.4" arc-swap = "1" diff --git a/src/admin.rs b/src/admin.rs index 39fb3be..8f94b5d 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -31,7 +31,7 @@ pub async fn handle_admin( if query.starts_with("SHOW STATS") { trace!("SHOW STATS"); - show_stats(stream).await + show_stats(stream, &pool).await } else if query.starts_with("RELOAD") { trace!("RELOAD"); reload(stream).await @@ -77,11 +77,19 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul ])); // but admin tools that work with pgbouncer want this res.put(data_row(&vec![ "free_clients".to_string(), - stats["cl_idle"].to_string(), + stats + .keys() + .map(|address_id| stats[&address_id]["cl_idle"]) + .sum::() + .to_string(), ])); res.put(data_row(&vec![ "used_clients".to_string(), - stats["cl_active"].to_string(), + stats + .keys() + .map(|address_id| stats[&address_id]["cl_active"]) + .sum::() + .to_string(), ])); res.put(data_row(&vec![ "login_clients".to_string(), @@ -89,11 +97,19 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul ])); res.put(data_row(&vec![ "free_servers".to_string(), - stats["sv_idle"].to_string(), + stats + .keys() + .map(|address_id| stats[&address_id]["sv_idle"]) + .sum::() + .to_string(), ])); res.put(data_row(&vec![ "used_servers".to_string(), - stats["sv_active"].to_string(), + stats + .keys() + .map(|address_id| stats[&address_id]["sv_active"]) + .sum::() + .to_string(), ])); res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()])); res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()])); @@ -125,7 +141,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// SHOW POOLS -async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> { +async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { let stats = get_stats(); let config = { let guard = get_config(); @@ -151,16 +167,26 @@ async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Resu let mut res = BytesMut::new(); res.put(row_description(&columns)); - let mut row = vec![String::from("all"), config.user.name.clone()]; + for shard in 0..pool.shards() { + for server in 0..pool.servers(shard) { + let address = pool.address(shard, server); + let stats = match stats.get(&address.id) { + Some(stats) => stats.clone(), + None => HashMap::new(), + }; - for column in &columns[2..columns.len() - 1] { - let value = stats.get(column.0).unwrap_or(&0).to_string(); - row.push(value); + let mut row = vec![address.name(), config.user.name.clone()]; + + for column in &columns[2..columns.len() - 1] { + let value = stats.get(column.0).unwrap_or(&0).to_string(); + row.push(value); + } + + row.push(config.general.pool_mode.to_string()); + res.put(data_row(&row)); + } } - row.push(config.general.pool_mode.to_string()); - - res.put(data_row(&row)); res.put(command_complete("SHOW")); res.put_u8(b'Z'); @@ -309,7 +335,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// SHOW STATS -async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { let columns = vec![ ("database", DataType::Text), ("total_xact_count", DataType::Numeric), @@ -332,15 +358,24 @@ async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { let mut res = BytesMut::new(); res.put(row_description(&columns)); - let mut row = vec![ - String::from("all"), // TODO: per-database stats, - ]; + for shard in 0..pool.shards() { + for server in 0..pool.servers(shard) { + let address = pool.address(shard, server); + let stats = match stats.get(&address.id) { + Some(stats) => stats.clone(), + None => HashMap::new(), + }; - for column in &columns[1..] { - row.push(stats.get(column.0).unwrap_or(&0).to_string()); + let mut row = vec![address.name()]; + + for column in &columns[1..] { + row.push(stats.get(column.0).unwrap_or(&0).to_string()); + } + + res.put(data_row(&row)); + } } - res.put(data_row(&row)); res.put(command_complete("SHOW")); res.put_u8(b'Z'); diff --git a/src/client.rs b/src/client.rs index f73ef22..0fc9c47 100644 --- a/src/client.rs +++ b/src/client.rs @@ -58,6 +58,12 @@ pub struct Client { // Clients want to talk to admin admin: bool, + + // Last address the client talked to + last_address_id: Option, + + // Last server process id we talked to + last_server_id: Option, } impl Client { @@ -147,6 +153,8 @@ impl Client { parameters: parameters, stats: stats, admin: admin, + last_address_id: None, + last_server_id: None, }); } @@ -169,6 +177,8 @@ impl Client { parameters: HashMap::new(), stats: stats, admin: false, + last_address_id: None, + last_server_id: None, }); } @@ -219,9 +229,6 @@ impl Client { loop { trace!("Client idle, waiting for message"); - // Client idle, waiting for messages. - self.stats.client_idle(self.process_id); - // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). // We can parse it here before grabbing a server from the pool, @@ -292,13 +299,13 @@ impl Client { continue; } - // Waiting for server connection. - self.stats.client_waiting(self.process_id); - debug!("Waiting for connection from pool"); // Grab a server from the pool: the client issued a regular query. - let connection = match pool.get(query_router.shard(), query_router.role()).await { + let connection = match pool + .get(query_router.shard(), query_router.role(), self.process_id) + .await + { Ok(conn) => { debug!("Got connection from pool"); conn @@ -312,15 +319,23 @@ impl Client { }; let mut reference = connection.0; - let _address = connection.1; + let address = connection.1; let server = &mut *reference; // Claim this server as mine for query cancellation. server.claim(self.process_id, self.secret_key); + // "disconnect" from the previous server stats-wise + if let Some(last_address_id) = self.last_address_id { + self.stats + .client_disconnecting(self.process_id, last_address_id); + } + // Client active & server active - self.stats.client_active(self.process_id); - self.stats.server_active(server.process_id()); + self.stats.client_active(self.process_id, address.id); + self.stats.server_active(server.process_id(), address.id); + self.last_address_id = Some(address.id); + self.last_server_id = Some(server.process_id()); debug!( "Client {:?} talking to server {:?}", @@ -392,17 +407,17 @@ impl Client { } // Report query executed statistics. - self.stats.query(); + self.stats.query(address.id); // The transaction is over, we can release the connection back to the pool. if !server.in_transaction() { // Report transaction executed statistics. - self.stats.transaction(); + self.stats.transaction(address.id); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if self.transaction_mode { - self.stats.server_idle(server.process_id()); + self.stats.server_idle(server.process_id(), address.id); break; } } @@ -478,15 +493,15 @@ impl Client { } // Report query executed statistics. - self.stats.query(); + self.stats.query(address.id); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if !server.in_transaction() { - self.stats.transaction(); + self.stats.transaction(address.id); if self.transaction_mode { - self.stats.server_idle(server.process_id()); + self.stats.server_idle(server.process_id(), address.id); break; } } @@ -517,10 +532,10 @@ impl Client { // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. if !server.in_transaction() { - self.stats.transaction(); + self.stats.transaction(address.id); if self.transaction_mode { - self.stats.server_idle(server.process_id()); + self.stats.server_idle(server.process_id(), address.id); break; } } @@ -537,6 +552,7 @@ impl Client { // The server is no longer bound to us, we can't cancel it's queries anymore. debug!("Releasing server back into the pool"); self.release(); + self.stats.client_idle(self.process_id, address.id); } } @@ -549,6 +565,14 @@ impl Client { impl Drop for Client { fn drop(&mut self) { - self.stats.client_disconnecting(self.process_id); + // Disconnect the client + if let Some(address_id) = self.last_address_id { + self.stats.client_disconnecting(self.process_id, address_id); + + // The server is now idle + if let Some(process_id) = self.last_server_id { + self.stats.server_idle(process_id, address_id); + } + } } } diff --git a/src/config.rs b/src/config.rs index 663a9f4..cd8b2eb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,6 +48,7 @@ impl PartialEq for Option { #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] pub struct Address { + pub id: usize, pub host: String, pub port: String, pub shard: usize, @@ -58,6 +59,7 @@ pub struct Address { impl Default for Address { fn default() -> Address { Address { + id: 0, host: String::from("127.0.0.1"), port: String::from("5432"), shard: 0, diff --git a/src/main.rs b/src/main.rs index a41d5ec..ada4551 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,6 @@ extern crate once_cell; extern crate serde; extern crate serde_derive; extern crate sqlparser; -extern crate statsd; extern crate tokio; extern crate toml; @@ -113,15 +112,19 @@ async fn main() { // Collect statistics and send them to StatsD let (tx, rx) = mpsc::channel(100); - let collector_tx = tx.clone(); - tokio::task::spawn(async move { - let mut stats_collector = Collector::new(rx, collector_tx); - stats_collector.collect().await; - }); + // Connection pool for all shards and replicas let mut pool = ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await; + let collector_tx = tx.clone(); + let addresses = pool.databases(); + tokio::task::spawn(async move { + let mut stats_collector = Collector::new(rx, collector_tx); + stats_collector.collect(addresses).await; + }); + + // Connect to all servers and validate their versions. let server_info = match pool.validate().await { Ok(info) => info, Err(err) => { diff --git a/src/pool.rs b/src/pool.rs index b8a0ecf..b534842 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -38,6 +38,7 @@ impl ConnectionPool { let mut shards = Vec::new(); let mut addresses = Vec::new(); let mut banlist = Vec::new(); + let mut address_id = 0; let mut shard_ids = config .shards .clone() @@ -63,6 +64,7 @@ impl ConnectionPool { }; let address = Address { + id: address_id, host: server.0.clone(), port: server.1.to_string(), role: role, @@ -70,6 +72,8 @@ impl ConnectionPool { shard: shard_idx.parse::().unwrap(), }; + address_id += 1; + if role == Role::Replica { replica_number += 1; } @@ -121,9 +125,13 @@ impl ConnectionPool { pub async fn validate(&mut self) -> Result { let mut server_infos = Vec::new(); + let stats = self.stats.clone(); for shard in 0..self.shards() { for _ in 0..self.servers(shard) { - let connection = match self.get(shard, None).await { + // To keep stats consistent. + let fake_process_id = 0; + + let connection = match self.get(shard, None, fake_process_id).await { Ok(conn) => conn, Err(err) => { error!("Shard {} down or misconfigured: {:?}", shard, err); @@ -137,6 +145,8 @@ impl ConnectionPool { let server_info = server.server_info(); + stats.client_disconnecting(fake_process_id, address.id); + if server_infos.len() > 0 { // Compare against the last server checked. if server_info != server_infos[server_infos.len() - 1] { @@ -165,6 +175,7 @@ impl ConnectionPool { &mut self, shard: usize, role: Option, + process_id: i32, ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); let addresses = &self.addresses[shard]; @@ -200,6 +211,8 @@ impl ConnectionPool { let index = self.round_robin % addresses.len(); let address = &addresses[index]; + self.stats.client_waiting(process_id, address.id); + // Make sure you're getting a primary or a replica // as per request. If no specific role is requested, the first // available will be chosen. @@ -219,6 +232,9 @@ impl ConnectionPool { Err(err) => { error!("Banning replica {}, error: {:?}", index, err); self.ban(address, shard); + self.stats.client_disconnecting(process_id, address.id); + self.stats + .checkout_time(now.elapsed().as_micros(), address.id); continue; } }; @@ -227,7 +243,7 @@ impl ConnectionPool { let server = &mut *conn; let healthcheck_timeout = get_config().general.healthcheck_timeout; - self.stats.server_tested(server.process_id()); + self.stats.server_tested(server.process_id(), address.id); match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), @@ -238,8 +254,9 @@ impl ConnectionPool { // Check if health check succeeded Ok(res) => match res { Ok(_) => { - self.stats.checkout_time(now.elapsed().as_micros()); - self.stats.server_idle(conn.process_id()); + self.stats + .checkout_time(now.elapsed().as_micros(), address.id); + self.stats.server_idle(conn.process_id(), address.id); return Ok((conn, address.clone())); } Err(_) => { @@ -248,6 +265,9 @@ impl ConnectionPool { server.mark_bad(); self.ban(address, shard); + self.stats.client_disconnecting(process_id, address.id); + self.stats + .checkout_time(now.elapsed().as_micros(), address.id); continue; } }, @@ -258,6 +278,9 @@ impl ConnectionPool { server.mark_bad(); self.ban(address, shard); + self.stats.client_disconnecting(process_id, address.id); + self.stats + .checkout_time(now.elapsed().as_micros(), address.id); continue; } } @@ -395,13 +418,14 @@ impl ManageConnection for ServerPool { async fn connect(&self) -> Result { info!( "Creating a new connection to {:?} using user {:?}", - self.address, self.user.name + self.address.name(), + self.user.name ); // Put a temporary process_id into the stats // for server login. let process_id = rand::random::(); - self.stats.server_login(process_id); + self.stats.server_login(process_id, self.address.id); match Server::startup( &self.address, @@ -414,12 +438,12 @@ impl ManageConnection for ServerPool { { Ok(conn) => { // Remove the temporary process_id from the stats. - self.stats.server_disconnecting(process_id); + self.stats.server_disconnecting(process_id, self.address.id); Ok(conn) } Err(err) => { // Remove the temporary process_id from the stats. - self.stats.server_disconnecting(process_id); + self.stats.server_disconnecting(process_id, self.address.id); Err(err) } } diff --git a/src/query_router.rs b/src/query_router.rs index 737c4d1..daa9171 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -12,7 +12,7 @@ use sqlparser::parser::Parser; const CUSTOM_SQL_REGEXES: [&str; 5] = [ r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", - r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$", + r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$", r"(?i)^ *SHOW SHARD *;? *$", r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$", r"(?i)^ *SHOW SERVER ROLE *;? *$", @@ -192,7 +192,10 @@ impl QueryRouter { } Command::SetShard => { - self.active_shard = Some(value.parse::().unwrap()); + self.active_shard = match value.to_ascii_uppercase().as_ref() { + "ANY" => Some(rand::random::() % self.shards), + _ => Some(value.parse::().unwrap()), + }; } Command::SetServerRole => { diff --git a/src/server.rs b/src/server.rs index 934fc95..4ef9422 100644 --- a/src/server.rs +++ b/src/server.rs @@ -268,7 +268,7 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { - self.stats.data_sent(messages.len()); + self.stats.data_sent(messages.len(), self.address.id); match write_all_half(&mut self.write, messages).await { Ok(_) => Ok(()), @@ -374,7 +374,7 @@ impl Server { let bytes = self.buffer.clone(); // Keep track of how much data we got from the server for stats. - self.stats.data_received(bytes.len()); + self.stats.data_received(bytes.len(), self.address.id); // Clear the buffer for next query. self.buffer.clear(); @@ -470,7 +470,8 @@ impl Drop for Server { /// the socket is in non-blocking mode, so it may not be ready /// for a write. fn drop(&mut self) { - self.stats.server_disconnecting(self.process_id()); + self.stats + .server_disconnecting(self.process_id(), self.address.id); let mut bytes = BytesMut::with_capacity(4); bytes.put_u8(b'X'); diff --git a/src/stats.rs b/src/stats.rs index 99f709d..7d4b025 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,15 +1,13 @@ -use log::{debug, info}; +use log::info; use once_cell::sync::Lazy; use parking_lot::Mutex; -use statsd::Client; use tokio::sync::mpsc::{Receiver, Sender}; use std::collections::HashMap; -use crate::config::get_config; - // Stats used in SHOW STATS -static LATEST_STATS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +static LATEST_STATS: Lazy>>> = + Lazy::new(|| Mutex::new(HashMap::new())); static STAT_PERIOD: u64 = 15000; //15 seconds #[derive(Debug, Clone, Copy)] @@ -28,7 +26,8 @@ enum EventName { ServerTested, ServerLogin, ServerDisconnecting, - FlushStatsToStatsD, + UpdateStats, + UpdateAverages, } #[derive(Debug)] @@ -36,6 +35,7 @@ pub struct Event { name: EventName, value: i64, process_id: Option, + address_id: usize, } #[derive(Clone, Debug)] @@ -48,141 +48,155 @@ impl Reporter { Reporter { tx: tx } } - pub fn query(&self) { + pub fn query(&self, address_id: usize) { let event = Event { name: EventName::Query, value: 1, process_id: None, + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn transaction(&self) { + pub fn transaction(&self, address_id: usize) { let event = Event { name: EventName::Transaction, value: 1, process_id: None, + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn data_sent(&self, amount: usize) { + pub fn data_sent(&self, amount: usize, address_id: usize) { let event = Event { name: EventName::DataSent, value: amount as i64, process_id: None, + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn data_received(&self, amount: usize) { + pub fn data_received(&self, amount: usize, address_id: usize) { let event = Event { name: EventName::DataReceived, value: amount as i64, process_id: None, + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn checkout_time(&self, ms: u128) { + pub fn checkout_time(&self, ms: u128, address_id: usize) { let event = Event { name: EventName::CheckoutTime, value: ms as i64, process_id: None, + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn client_waiting(&self, process_id: i32) { + pub fn client_waiting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientWaiting, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn client_active(&self, process_id: i32) { + pub fn client_active(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientActive, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn client_idle(&self, process_id: i32) { + pub fn client_idle(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientIdle, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn client_disconnecting(&self, process_id: i32) { + pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ClientDisconnecting, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn server_active(&self, process_id: i32) { + pub fn server_active(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerActive, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn server_idle(&self, process_id: i32) { + pub fn server_idle(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerIdle, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn server_login(&self, process_id: i32) { + pub fn server_login(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerLogin, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn server_tested(&self, process_id: i32) { + pub fn server_tested(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerTested, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); } - pub fn server_disconnecting(&self, process_id: i32) { + pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { let event = Event { name: EventName::ServerDisconnecting, value: 1, process_id: Some(process_id), + address_id: address_id, }; let _ = self.tx.try_send(event); @@ -192,22 +206,17 @@ impl Reporter { pub struct Collector { rx: Receiver, tx: Sender, - client: Client, } impl Collector { pub fn new(rx: Receiver, tx: Sender) -> Collector { - Collector { - rx, - tx, - client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(), - } + Collector { rx, tx } } - pub async fn collect(&mut self) { + pub async fn collect(&mut self, addresses: usize) { info!("Events reporter started"); - let mut stats = HashMap::from([ + let stats_template = HashMap::from([ ("total_query_count", 0), ("total_xact_count", 0), ("total_sent", 0), @@ -232,25 +241,47 @@ impl Collector { ("sv_tested", 0), ]); + let mut stats = HashMap::new(); + // Stats saved after each iteration of the flush event. Used in calculation // of averages in the last flush period. - let mut old_stats: HashMap = HashMap::new(); + let mut old_stats: HashMap> = HashMap::new(); // Track which state the client and server are at any given time. - let mut client_server_states: HashMap = HashMap::new(); + let mut client_server_states: HashMap> = HashMap::new(); // Flush stats to StatsD and calculate averages every 15 seconds. + let tx = self.tx.clone(); + tokio::task::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15)); + loop { + interval.tick().await; + for address_id in 0..addresses { + let _ = tx.try_send(Event { + name: EventName::UpdateStats, + value: 0, + process_id: None, + address_id: address_id, + }); + } + } + }); + let tx = self.tx.clone(); tokio::task::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); loop { interval.tick().await; - let _ = tx.try_send(Event { - name: EventName::FlushStatsToStatsD, - value: 0, - process_id: None, - }); + for address_id in 0..addresses { + let _ = tx.try_send(Event { + name: EventName::UpdateAverages, + value: 0, + process_id: None, + address_id: address_id, + }); + } } }); @@ -264,6 +295,14 @@ impl Collector { } }; + let stats = stats + .entry(stat.address_id) + .or_insert(stats_template.clone()); + let client_server_states = client_server_states + .entry(stat.address_id) + .or_insert(HashMap::new()); + let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new()); + // Some are counters, some are gauges... match stat.name { EventName::Query => { @@ -320,9 +359,9 @@ impl Collector { client_server_states.remove(&stat.process_id.unwrap()); } - EventName::FlushStatsToStatsD => { + EventName::UpdateStats => { // Calculate connection states - for (_, state) in &client_server_states { + for (_, state) in client_server_states.iter() { match state { EventName::ClientActive => { let counter = stats.entry("cl_active").or_insert(0); @@ -334,11 +373,6 @@ impl Collector { *counter += 1; } - EventName::ClientIdle => { - let counter = stats.entry("cl_idle").or_insert(0); - *counter += 1; - } - EventName::ServerIdle => { let counter = stats.entry("sv_idle").or_insert(0); *counter += 1; @@ -359,39 +393,20 @@ impl Collector { *counter += 1; } + EventName::ClientIdle => { + let counter = stats.entry("cl_idle").or_insert(0); + *counter += 1; + } + _ => unreachable!(), }; } - // Calculate averages - for stat in &[ - "avg_query_count", - "avgxact_count", - "avg_sent", - "avg_received", - "avg_wait_time", - ] { - let total_name = stat.replace("avg_", "total_"); - let old_value = old_stats.entry(total_name.clone()).or_insert(0); - let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); - let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second - - stats.insert(stat, avg); - *old_value = new_value; - } - - debug!("{:?}", stats); - // Update latest stats used in SHOW STATS let mut guard = LATEST_STATS.lock(); - for (key, value) in &stats { - guard.insert(key.to_string(), value.clone()); - } - - let mut pipeline = self.client.pipeline(); - for (key, value) in stats.iter() { - pipeline.gauge(key, *value as f64); + let entry = guard.entry(stat.address_id).or_insert(HashMap::new()); + entry.insert(key.to_string(), value.clone()); } // These are re-calculated every iteration of the loop, so we don't want to add values @@ -409,14 +424,31 @@ impl Collector { ] { stats.insert(stat, 0); } + } - pipeline.send(&self.client); + EventName::UpdateAverages => { + // Calculate averages + for stat in &[ + "avg_query_count", + "avgxact_count", + "avg_sent", + "avg_received", + "avg_wait_time", + ] { + let total_name = stat.replace("avg_", "total_"); + let old_value = old_stats.entry(total_name.clone()).or_insert(0); + let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); + let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second + + stats.insert(stat, avg); + *old_value = new_value; + } } }; } } } -pub fn get_stats() -> HashMap { +pub fn get_stats() -> HashMap> { LATEST_STATS.lock().clone() } diff --git a/tests/pgbench/simple.sql b/tests/pgbench/simple.sql new file mode 100644 index 0000000..0a283ba --- /dev/null +++ b/tests/pgbench/simple.sql @@ -0,0 +1,28 @@ + +-- \setrandom aid 1 :naccounts +\set aid random(1, 100000) +-- \setrandom bid 1 :nbranches +\set bid random(1, 100000) +-- \setrandom tid 1 :ntellers +\set tid random(1, 100000) +-- \setrandom delta -5000 5000 +\set delta random(-5000,5000) + +\set shard random(0, 2) + +SET SHARD TO :shard; + +BEGIN; + +UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid; + +SELECT abalance FROM pgbench_accounts WHERE aid = :aid; + +UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid; + +UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid; + +INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP); + +END; +