Some stats (#19)

This commit is contained in:
Lev Kokotov
2022-02-14 10:00:55 -08:00
committed by GitHub
parent 574ebe02b8
commit 8e5e28a139
7 changed files with 325 additions and 25 deletions

View File

@@ -16,6 +16,7 @@ use crate::messages::*;
use crate::pool::{ClientServerMap, ConnectionPool};
use crate::server::Server;
use crate::sharding::Sharder;
use crate::stats::Reporter;
pub const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';";
pub const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)';";
@@ -57,6 +58,9 @@ pub struct Client {
// Client parameters, e.g. user, client_encoding, etc.
parameters: HashMap<String, String>,
// Statistics
stats: Reporter,
}
impl Client {
@@ -69,6 +73,7 @@ impl Client {
transaction_mode: bool,
default_server_role: Option<Role>,
server_info: BytesMut,
stats: Reporter,
) -> Result<Client, Error> {
loop {
// Could be StartupMessage or SSLRequest
@@ -127,6 +132,7 @@ impl Client {
client_server_map: client_server_map,
default_server_role: default_server_role,
parameters: parameters,
stats: stats,
});
}
@@ -148,6 +154,7 @@ impl Client {
client_server_map: client_server_map,
default_server_role: default_server_role,
parameters: HashMap::new(),
stats: stats,
});
}
@@ -220,7 +227,6 @@ impl Client {
};
// Grab a server from the pool.
// None = any shard
let connection = match pool.get(shard, role).await {
Ok(conn) => conn,
Err(err) => {
@@ -287,11 +293,19 @@ impl Client {
}
}
// Release server
if !server.in_transaction() && self.transaction_mode {
shard = None;
role = self.default_server_role;
break;
// Send statistic
self.stats.query();
// Transaction over
if !server.in_transaction() {
self.stats.transaction();
// Release server
if self.transaction_mode {
shard = None;
role = self.default_server_role;
break;
}
}
}
@@ -350,11 +364,17 @@ impl Client {
}
}
self.stats.query();
// Release server
if !server.in_transaction() && self.transaction_mode {
shard = None;
role = self.default_server_role;
break;
if !server.in_transaction() {
self.stats.transaction();
if self.transaction_mode {
shard = None;
role = self.default_server_role;
break;
}
}
}
@@ -378,11 +398,14 @@ impl Client {
};
// Release the server
if !server.in_transaction() && self.transaction_mode {
println!("Releasing after copy done");
shard = None;
role = self.default_server_role;
break;
if !server.in_transaction() {
self.stats.transaction();
if self.transaction_mode {
shard = None;
role = self.default_server_role;
break;
}
}
}

View File

@@ -21,6 +21,7 @@ extern crate num_cpus;
extern crate once_cell;
extern crate serde;
extern crate serde_derive;
extern crate statsd;
extern crate tokio;
extern crate toml;
@@ -30,6 +31,7 @@ use tokio::signal;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
mod client;
mod config;
@@ -38,11 +40,13 @@ mod messages;
mod pool;
mod server;
mod sharding;
mod stats;
// Support for query cancellation: this maps our process_ids and
// secret keys to the backend's.
use config::Role;
use pool::{ClientServerMap, ConnectionPool};
use stats::{Collector, Reporter};
/// Main!
#[tokio::main(worker_threads = 4)]
@@ -87,7 +91,23 @@ async fn main() {
);
println!("> Connection timeout: {}ms", config.general.connect_timeout);
let mut pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
// Collect statistics and send them to StatsD
let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(async move {
println!("> Statistics reporter started");
let mut stats_collector = Collector::new(rx);
stats_collector.collect().await;
});
let mut pool = ConnectionPool::from_config(
config.clone(),
client_server_map.clone(),
Reporter::new(tx.clone()),
)
.await;
let transaction_mode = config.general.pool_mode == "transaction";
let default_server_role = match config.query_router.default_role.as_ref() {
"any" => None,
@@ -115,6 +135,7 @@ async fn main() {
let pool = pool.clone();
let client_server_map = client_server_map.clone();
let server_info = server_info.clone();
let reporter = Reporter::new(tx.clone());
let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr),
@@ -136,6 +157,7 @@ async fn main() {
transaction_mode,
default_server_role,
server_info,
reporter,
)
.await
{
@@ -170,7 +192,10 @@ async fn main() {
// Setup shut down sequence
match signal::ctrl_c().await {
Ok(()) => {}
Ok(()) => {
println!("> Shutting down...");
}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}

View File

@@ -7,6 +7,7 @@ use chrono::naive::NaiveDateTime;
use crate::config::{Address, Config, Role, User};
use crate::errors::Error;
use crate::server::Server;
use crate::stats::Reporter;
use std::collections::HashMap;
use std::sync::{
@@ -14,6 +15,7 @@ use std::sync::{
Arc,
Mutex,
};
use std::time::Instant;
// Banlist: bad servers go in here.
pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>;
@@ -29,11 +31,16 @@ pub struct ConnectionPool {
healthcheck_timeout: u64,
ban_time: i64,
pool_size: u32,
stats: Reporter,
}
impl ConnectionPool {
/// Construct the connection pool from a config file.
pub async fn from_config(config: Config, client_server_map: ClientServerMap) -> ConnectionPool {
pub async fn from_config(
config: Config,
client_server_map: ClientServerMap,
stats: Reporter,
) -> ConnectionPool {
let mut shards = Vec::new();
let mut addresses = Vec::new();
let mut banlist = Vec::new();
@@ -71,6 +78,7 @@ impl ConnectionPool {
config.user.clone(),
&shard.database,
client_server_map.clone(),
stats.clone(),
);
let pool = Pool::builder()
@@ -103,6 +111,7 @@ impl ConnectionPool {
healthcheck_timeout: config.general.healthcheck_timeout,
ban_time: config.general.ban_time,
pool_size: config.general.pool_size,
stats: stats,
}
}
@@ -149,6 +158,7 @@ impl ConnectionPool {
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
// Set this to false to gain ~3-4% speed.
let with_health_check = true;
let now = Instant::now();
let shard = match shard {
Some(shard) => shard,
@@ -227,6 +237,7 @@ impl ConnectionPool {
};
if !with_health_check {
self.stats.checkout_time(now.elapsed().as_millis());
return Ok((conn, address.clone()));
}
@@ -241,7 +252,10 @@ impl ConnectionPool {
{
// Check if health check succeeded
Ok(res) => match res {
Ok(_) => return Ok((conn, address.clone())),
Ok(_) => {
self.stats.checkout_time(now.elapsed().as_millis());
return Ok((conn, address.clone()));
}
Err(_) => {
println!(
">> Banning replica {} because of failed health check",
@@ -340,6 +354,7 @@ pub struct ServerPool {
user: User,
database: String,
client_server_map: ClientServerMap,
stats: Reporter,
}
impl ServerPool {
@@ -348,12 +363,14 @@ impl ServerPool {
user: User,
database: &str,
client_server_map: ClientServerMap,
stats: Reporter,
) -> ServerPool {
ServerPool {
address: address,
user: user,
database: database.to_string(),
client_server_map: client_server_map,
stats: stats,
}
}
}
@@ -375,6 +392,7 @@ impl ManageConnection for ServerPool {
&self.database,
self.client_server_map.clone(),
self.address.role,
self.stats.clone(),
)
.await
}

View File

@@ -11,6 +11,7 @@ use tokio::net::TcpStream;
use crate::config::{Address, Role};
use crate::errors::Error;
use crate::messages::*;
use crate::stats::Reporter;
use crate::ClientServerMap;
/// Server state.
@@ -54,6 +55,9 @@ pub struct Server {
// Server connected at
connected_at: chrono::naive::NaiveDateTime,
// Stats
stats: Reporter,
}
impl Server {
@@ -67,6 +71,7 @@ impl Server {
database: &str,
client_server_map: ClientServerMap,
role: Role,
stats: Reporter,
) -> Result<Server, Error> {
let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await {
Ok(stream) => stream,
@@ -198,6 +203,7 @@ impl Server {
client_server_map: client_server_map,
role: role,
connected_at: chrono::offset::Utc::now().naive_utc(),
stats: stats,
});
}
@@ -236,6 +242,8 @@ impl Server {
/// Send data to the server from the client.
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
self.stats.data_sent(messages.len());
match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()),
Err(err) => {
@@ -280,8 +288,6 @@ impl Server {
self.in_transaction = false;
}
// Error client didn't clean up!
// We shuold drop this server
'E' => {
self.in_transaction = true;
}
@@ -332,6 +338,7 @@ impl Server {
}
let bytes = self.buffer.clone();
self.stats.data_received(bytes.len());
self.buffer.clear();
Ok(bytes)

163
src/stats.rs Normal file
View File

@@ -0,0 +1,163 @@
use statsd::Client;
/// Statistics collector and publisher.
use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug)]
pub enum StatisticName {
CheckoutTime,
//QueryRuntime,
//TransactionTime,
Queries,
Transactions,
DataSent,
DataReceived,
}
#[derive(Debug)]
pub struct Statistic {
pub name: StatisticName,
pub value: i64,
}
#[derive(Clone, Debug)]
pub struct Reporter {
tx: Sender<Statistic>,
}
impl Reporter {
pub fn new(tx: Sender<Statistic>) -> Reporter {
Reporter { tx: tx }
}
pub fn query(&mut self) {
let statistic = Statistic {
name: StatisticName::Queries,
value: 1,
};
let _ = self.tx.try_send(statistic);
}
pub fn transaction(&mut self) {
let statistic = Statistic {
name: StatisticName::Transactions,
value: 1,
};
let _ = self.tx.try_send(statistic);
}
pub fn data_sent(&mut self, amount: usize) {
let statistic = Statistic {
name: StatisticName::DataSent,
value: amount as i64,
};
let _ = self.tx.try_send(statistic);
}
pub fn data_received(&mut self, amount: usize) {
let statistic = Statistic {
name: StatisticName::DataReceived,
value: amount as i64,
};
let _ = self.tx.try_send(statistic);
}
pub fn checkout_time(&mut self, ms: u128) {
let statistic = Statistic {
name: StatisticName::CheckoutTime,
value: ms as i64,
};
let _ = self.tx.try_send(statistic);
}
}
pub struct Collector {
rx: Receiver<Statistic>,
client: Client,
}
impl Collector {
pub fn new(rx: Receiver<Statistic>) -> Collector {
Collector {
rx: rx,
client: Client::new("127.0.0.1:8125", "pgcat").unwrap(),
}
}
pub async fn collect(&mut self) {
let mut stats = HashMap::from([
("queries", 0),
("transactions", 0),
("data_sent", 0),
("data_received", 0),
("checkout_time", 0),
]);
let mut now = Instant::now();
loop {
let stat = match self.rx.recv().await {
Some(stat) => stat,
None => {
println!(">> Statistics collector is shutting down.");
return;
}
};
// Some are counters, some are gauges...
match stat.name {
StatisticName::Queries => {
let counter = stats.entry("queries").or_insert(0);
*counter += stat.value;
}
StatisticName::Transactions => {
let counter = stats.entry("transactions").or_insert(0);
*counter += stat.value;
}
StatisticName::DataSent => {
let counter = stats.entry("data_sent").or_insert(0);
*counter += stat.value;
}
StatisticName::DataReceived => {
let counter = stats.entry("data_received").or_insert(0);
*counter += stat.value;
}
StatisticName::CheckoutTime => {
let counter = stats.entry("checkout_time").or_insert(0);
// Report max time here
if stat.value > *counter {
*counter = stat.value;
}
}
};
// It's been 15 seconds. If there is no traffic, it won't publish anything,
// but it also doesn't matter then.
if now.elapsed().as_secs() > 15 {
let mut pipeline = self.client.pipeline();
println!(">> Publishing statistics to StatsD: {:?}", stats);
for (key, value) in stats.iter_mut() {
pipeline.gauge(key, *value as f64);
*value = 0;
}
pipeline.send(&self.client);
now = Instant::now();
}
}
}
}