From af1716bcd7a5f977e8f0cabc6ce3e0fbe96093d8 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 22 Feb 2022 18:10:30 -0800 Subject: [PATCH] Flush stats (#38) * flush stats * stats * refactor --- README.md | 34 ++++----- src/main.rs | 4 +- src/stats.rs | 199 ++++++++++++++++++++++++++++----------------------- 3 files changed, 128 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index b661737..b05eaaf 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,14 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su ## Features | **Feature** | **Status** | **Comments** | |--------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| -| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. | -| Session pooling | :heavy_check_mark: | Identical to PgBouncer. | -| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | -| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. | -| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | -| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | -| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | -| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. | +| Transaction pooling | :white_check_mark: | Identical to PgBouncer. | +| Session pooling | :white_check_mark: | Identical to PgBouncer. | +| `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | +| Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. | +| Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | +| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | +| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | +| Statistics reporting | :white_check_mark: | Statistics similar to PgBouncers are reported via StatsD. | | Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. | | Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. | @@ -75,15 +75,15 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing. | **Feature** | **Tested in CI** | **Tested manually** | **Comments** | |-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------| -| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. | -| Session pooling | :heavy_check_mark: | :heavy_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. | -| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. | -| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. | -| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. | -| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. | -| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | -| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | -| Live config reloading | :heavy_check_mark: | :heavy_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. | +| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. | +| Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. | +| `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. | +| Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. | +| Load balancing | :x: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. | +| Failover | :x: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. | +| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | +| Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | +| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. | ## Usage diff --git a/src/main.rs b/src/main.rs index 72767c1..21da9af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,9 +111,9 @@ async fn main() { // Collect statistics and send them to StatsD let (tx, rx) = mpsc::channel(100); - + let collector_tx = tx.clone(); tokio::task::spawn(async move { - let mut stats_collector = Collector::new(rx); + let mut stats_collector = Collector::new(rx, collector_tx); stats_collector.collect().await; }); diff --git a/src/stats.rs b/src/stats.rs index 0332b5f..384aa1a 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -4,7 +4,6 @@ use statsd::Client; use tokio::sync::mpsc::{Receiver, Sender}; use std::collections::HashMap; -use std::time::Instant; use crate::config::get_config; @@ -24,6 +23,7 @@ enum EventName { ServerTested, ServerLogin, ServerDisconnecting, + FlushStatsToStatsD, } #[derive(Debug)] @@ -44,155 +44,167 @@ impl Reporter { } pub fn query(&self) { - let statistic = Event { + let event = Event { name: EventName::Query, value: 1, process_id: None, }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn transaction(&self) { - let statistic = Event { + let event = Event { name: EventName::Transaction, value: 1, process_id: None, }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn data_sent(&self, amount: usize) { - let statistic = Event { + let event = Event { name: EventName::DataSent, value: amount as i64, process_id: None, }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn data_received(&self, amount: usize) { - let statistic = Event { + let event = Event { name: EventName::DataReceived, value: amount as i64, process_id: None, }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn checkout_time(&self, ms: u128) { - let statistic = Event { + let event = Event { name: EventName::CheckoutTime, value: ms as i64, process_id: None, }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn client_waiting(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ClientWaiting, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn client_active(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ClientActive, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn client_idle(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ClientIdle, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn client_disconnecting(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ClientDisconnecting, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn server_active(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ServerActive, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn server_idle(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ServerIdle, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn server_login(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ServerLogin, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn server_tested(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ServerTested, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } pub fn server_disconnecting(&self, process_id: i32) { - let statistic = Event { + let event = Event { name: EventName::ServerDisconnecting, value: 1, process_id: Some(process_id), }; - let _ = self.tx.try_send(statistic); + let _ = self.tx.try_send(event); } + + // pub fn flush_to_statsd(&self) { + // let event = Event { + // name: EventName::FlushStatsToStatsD, + // value: 0, + // process_id: None, + // }; + + // let _ = self.tx.try_send(event); + // } } pub struct Collector { rx: Receiver, + tx: Sender, client: Client, } impl Collector { - pub fn new(rx: Receiver) -> Collector { + pub fn new(rx: Receiver, tx: Sender) -> Collector { Collector { - rx: rx, + rx, + tx, client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(), } } @@ -218,8 +230,19 @@ impl Collector { ]); let mut client_server_states: HashMap = HashMap::new(); + let tx = self.tx.clone(); - let mut now = Instant::now(); + tokio::task::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000)); + loop { + interval.tick().await; + let _ = tx.try_send(Event { + name: EventName::FlushStatsToStatsD, + value: 0, + process_id: None, + }); + } + }); loop { let stat = match self.rx.recv().await { @@ -284,65 +307,61 @@ impl Collector { EventName::ClientDisconnecting | EventName::ServerDisconnecting => { client_server_states.remove(&stat.process_id.unwrap()); } + + EventName::FlushStatsToStatsD => { + for (_, state) in &client_server_states { + match state { + EventName::ClientActive => { + let counter = stats.entry("cl_active").or_insert(0); + *counter += 1; + } + + EventName::ClientWaiting => { + let counter = stats.entry("cl_waiting").or_insert(0); + *counter += 1; + } + + EventName::ClientIdle => { + let counter = stats.entry("cl_idle").or_insert(0); + *counter += 1; + } + + EventName::ServerIdle => { + let counter = stats.entry("sv_idle").or_insert(0); + *counter += 1; + } + + EventName::ServerActive => { + let counter = stats.entry("sv_active").or_insert(0); + *counter += 1; + } + + EventName::ServerTested => { + let counter = stats.entry("sv_tested").or_insert(0); + *counter += 1; + } + + EventName::ServerLogin => { + let counter = stats.entry("sv_login").or_insert(0); + *counter += 1; + } + + _ => unreachable!(), + }; + } + + info!("{:?}", stats); + + let mut pipeline = self.client.pipeline(); + + for (key, value) in stats.iter_mut() { + pipeline.gauge(key, *value as f64); + *value = 0; + } + + pipeline.send(&self.client); + } }; - - // It's been 15 seconds. If there is no traffic, it won't publish anything, - // but it also doesn't matter then. - if now.elapsed().as_secs() > 15 { - for (_, state) in &client_server_states { - match state { - EventName::ClientActive => { - let counter = stats.entry("cl_active").or_insert(0); - *counter += 1; - } - - EventName::ClientWaiting => { - let counter = stats.entry("cl_waiting").or_insert(0); - *counter += 1; - } - - EventName::ClientIdle => { - let counter = stats.entry("cl_idle").or_insert(0); - *counter += 1; - } - - EventName::ServerIdle => { - let counter = stats.entry("sv_idle").or_insert(0); - *counter += 1; - } - - EventName::ServerActive => { - let counter = stats.entry("sv_active").or_insert(0); - *counter += 1; - } - - EventName::ServerTested => { - let counter = stats.entry("sv_tested").or_insert(0); - *counter += 1; - } - - EventName::ServerLogin => { - let counter = stats.entry("sv_login").or_insert(0); - *counter += 1; - } - - _ => unreachable!(), - }; - } - - info!("{:?}", stats); - - let mut pipeline = self.client.pipeline(); - - for (key, value) in stats.iter_mut() { - pipeline.gauge(key, *value as f64); - *value = 0; - } - - pipeline.send(&self.client); - - now = Instant::now(); - } } } }