use logger lib; minor refactor; sv_* stats (#29)

This commit is contained in:
Lev Kokotov
2022-02-20 22:47:08 -08:00
committed by GitHub
parent 108f5715c0
commit 44b5e7eeee
9 changed files with 236 additions and 89 deletions

49
Cargo.lock generated
View File

@@ -28,6 +28,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@@ -116,6 +127,19 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "env_logger"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.19" version = "0.3.19"
@@ -181,6 +205,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.12" version = "0.1.12"
@@ -329,6 +359,7 @@ dependencies = [
"bb8", "bb8",
"bytes", "bytes",
"chrono", "chrono",
"env_logger",
"log", "log",
"md-5", "md-5",
"num_cpus", "num_cpus",
@@ -530,6 +561,15 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "termcolor"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.44" version = "0.1.44"
@@ -620,6 +660,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@@ -24,3 +24,4 @@ statsd = "0.15"
sqlparser = "0.14" sqlparser = "0.14"
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
env_logger = "0.9"

View File

@@ -2,6 +2,7 @@
/// We are pretending to the server in this scenario, /// We are pretending to the server in this scenario,
/// and this module implements that. /// and this module implements that.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::error;
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -253,7 +254,7 @@ impl Client {
let connection = match pool.get(query_router.shard(), query_router.role()).await { let connection = match pool.get(query_router.shard(), query_router.role()).await {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
println!(">> Could not get connection from pool: {:?}", err); error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool") error_response(&mut self.write, "could not get connection from the pool")
.await?; .await?;
continue; continue;
@@ -267,8 +268,9 @@ impl Client {
// Claim this server as mine for query cancellation. // Claim this server as mine for query cancellation.
server.claim(self.process_id, self.secret_key); server.claim(self.process_id, self.secret_key);
// Client active // Client active & server active
self.stats.client_active(self.process_id); self.stats.client_active(self.process_id);
self.stats.server_active(server.process_id());
// Transaction loop. Multiple queries can be issued by the client here. // Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over, // The connection belongs to the client until the transaction is over,
@@ -338,7 +340,7 @@ impl Client {
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
// Report this client as idle. self.stats.server_idle(server.process_id());
break; break;
} }
} }
@@ -420,6 +422,7 @@ impl Client {
self.stats.transaction(); self.stats.transaction();
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id());
break; break;
} }
} }
@@ -453,6 +456,7 @@ impl Client {
self.stats.transaction(); self.stats.transaction();
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id());
break; break;
} }
} }
@@ -461,7 +465,7 @@ impl Client {
// Some unexpected message. We either did not implement the protocol correctly // Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to. // or this is not a Postgres client we're talking to.
_ => { _ => {
println!(">>> Unexpected code: {}", code); error!("Unexpected code: {}", code);
} }
} }
} }

View File

@@ -1,4 +1,5 @@
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use log::{error, info};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use tokio::fs::File; use tokio::fs::File;
@@ -150,14 +151,14 @@ impl Default for Config {
impl Config { impl Config {
pub fn show(&self) { pub fn show(&self) {
println!("> Pool size: {}", self.general.pool_size); info!("Pool size: {}", self.general.pool_size);
println!("> Pool mode: {}", self.general.pool_mode); info!("Pool mode: {}", self.general.pool_mode);
println!("> Ban time: {}s", self.general.ban_time); info!("Ban time: {}s", self.general.ban_time);
println!( info!(
"> Healthcheck timeout: {}ms", "Healthcheck timeout: {}ms",
self.general.healthcheck_timeout self.general.healthcheck_timeout
); );
println!("> Connection timeout: {}ms", self.general.connect_timeout); info!("Connection timeout: {}ms", self.general.connect_timeout);
} }
} }
@@ -171,7 +172,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
let mut file = match File::open(path).await { let mut file = match File::open(path).await {
Ok(file) => file, Ok(file) => file,
Err(err) => { Err(err) => {
println!("> Config error: {:?}", err); error!("{:?}", err);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
}; };
@@ -179,7 +180,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
match file.read_to_string(&mut contents).await { match file.read_to_string(&mut contents).await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
println!("> Config error: {:?}", err); error!("{:?}", err);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
}; };
@@ -187,7 +188,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
let config: Config = match toml::from_str(&contents) { let config: Config = match toml::from_str(&contents) {
Ok(config) => config, Ok(config) => config,
Err(err) => { Err(err) => {
println!("> Config error: {:?}", err); error!("{:?}", err);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
}; };
@@ -200,7 +201,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
let mut primary_count = 0; let mut primary_count = 0;
if shard.1.servers.len() == 0 { if shard.1.servers.len() == 0 {
println!("> Shard {} has no servers configured", shard.0); error!("Shard {} has no servers configured", shard.0);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
@@ -218,8 +219,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
"primary" => (), "primary" => (),
"replica" => (), "replica" => (),
_ => { _ => {
println!( error!(
"> Shard {} server role must be either 'primary' or 'replica', got: '{}'", "Shard {} server role must be either 'primary' or 'replica', got: '{}'",
shard.0, server.2 shard.0, server.2
); );
return Err(Error::BadConfig); return Err(Error::BadConfig);
@@ -228,12 +229,12 @@ pub async fn parse(path: &str) -> Result<(), Error> {
} }
if primary_count > 1 { if primary_count > 1 {
println!("> Shard {} has more than on primary configured.", &shard.0); error!("Shard {} has more than on primary configured", &shard.0);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
if dup_check.len() != shard.1.servers.len() { if dup_check.len() != shard.1.servers.len() {
println!("> Shard {} contains duplicate server configs.", &shard.0); error!("Shard {} contains duplicate server configs", &shard.0);
return Err(Error::BadConfig); return Err(Error::BadConfig);
} }
} }
@@ -243,8 +244,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
"primary" => (), "primary" => (),
"replica" => (), "replica" => (),
other => { other => {
println!( error!(
"> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", "Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
other other
); );
return Err(Error::BadConfig); return Err(Error::BadConfig);

View File

@@ -23,6 +23,7 @@ extern crate arc_swap;
extern crate async_trait; extern crate async_trait;
extern crate bb8; extern crate bb8;
extern crate bytes; extern crate bytes;
extern crate env_logger;
extern crate log; extern crate log;
extern crate md5; extern crate md5;
extern crate num_cpus; extern crate num_cpus;
@@ -34,15 +35,16 @@ extern crate statsd;
extern crate tokio; extern crate tokio;
extern crate toml; extern crate toml;
use log::{error, info};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::{ use tokio::{
signal, signal,
signal::unix::{signal as unix_signal, SignalKind}, signal::unix::{signal as unix_signal, SignalKind},
sync::mpsc,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
mod client; mod client;
mod config; mod config;
@@ -64,11 +66,12 @@ use stats::{Collector, Reporter};
/// Main! /// Main!
#[tokio::main(worker_threads = 4)] #[tokio::main(worker_threads = 4)]
async fn main() { async fn main() {
println!("> Welcome to PgCat! Meow."); env_logger::init();
info!("Welcome to PgCat! Meow.");
// Prepare regexes // Prepare regexes
if !query_router::QueryRouter::setup() { if !query_router::QueryRouter::setup() {
println!("> Could not setup query router."); error!("Could not setup query router");
return; return;
} }
@@ -76,7 +79,7 @@ async fn main() {
match config::parse("pgcat.toml").await { match config::parse("pgcat.toml").await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
println!("> Config parse error: {:?}", err); error!("Config parse error: {:?}", err);
return; return;
} }
}; };
@@ -87,12 +90,12 @@ async fn main() {
let listener = match TcpListener::bind(&addr).await { let listener = match TcpListener::bind(&addr).await {
Ok(sock) => sock, Ok(sock) => sock,
Err(err) => { Err(err) => {
println!("> Error: {:?}", err); error!("Listener socket error: {:?}", err);
return; return;
} }
}; };
println!("> Running on {}", addr); info!("Running on {}", addr);
config.show(); config.show();
// Tracks which client is connected to which server for query cancellation. // Tracks which client is connected to which server for query cancellation.
@@ -102,8 +105,6 @@ async fn main() {
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(async move { tokio::task::spawn(async move {
println!("> Statistics reporter started");
let mut stats_collector = Collector::new(rx); let mut stats_collector = Collector::new(rx);
stats_collector.collect().await; stats_collector.collect().await;
}); });
@@ -114,12 +115,12 @@ async fn main() {
let server_info = match pool.validate().await { let server_info = match pool.validate().await {
Ok(info) => info, Ok(info) => info,
Err(err) => { Err(err) => {
println!("> Could not validate connection pool: {:?}", err); error!("Could not validate connection pool: {:?}", err);
return; return;
} }
}; };
println!("> Waiting for clients..."); info!("Waiting for clients");
// Main app runs here. // Main app runs here.
tokio::task::spawn(async move { tokio::task::spawn(async move {
@@ -132,7 +133,7 @@ async fn main() {
let (socket, addr) = match listener.accept().await { let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr), Ok((socket, addr)) => (socket, addr),
Err(err) => { Err(err) => {
println!("> Listener: {:?}", err); error!("{:?}", err);
continue; continue;
} }
}; };
@@ -140,35 +141,31 @@ async fn main() {
// Client goes to another thread, bye. // Client goes to another thread, bye.
tokio::task::spawn(async move { tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc(); let start = chrono::offset::Utc::now().naive_utc();
println!(">> Client {:?} connected", addr);
match client::Client::startup(socket, client_server_map, server_info, reporter) match client::Client::startup(socket, client_server_map, server_info, reporter)
.await .await
{ {
Ok(mut client) => { Ok(mut client) => {
println!(">> Client {:?} authenticated successfully!", addr); info!("Client {:?} connected", addr);
match client.handle(pool).await { match client.handle(pool).await {
Ok(()) => { Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start; let duration = chrono::offset::Utc::now().naive_utc() - start;
println!( info!(
">> Client {:?} disconnected, session duration: {}", "Client {:?} disconnected, session duration: {}",
addr, addr,
format_duration(&duration) format_duration(&duration)
); );
} }
Err(err) => { Err(err) => {
println!(">> Client disconnected with error: {:?}", err); error!("Client disconnected with error: {:?}", err);
client.release(); client.release();
} }
} }
} }
Err(err) => { Err(err) => {
println!(">> Error: {:?}", err); error!("Client failed to login: {:?}", err);
} }
}; };
}); });
@@ -182,13 +179,13 @@ async fn main() {
loop { loop {
stream.recv().await; stream.recv().await;
println!("> Reloading config"); info!("Reloading config");
match config::parse("pgcat.toml").await { match config::parse("pgcat.toml").await {
Ok(_) => { Ok(_) => {
get_config().show(); get_config().show();
} }
Err(err) => { Err(err) => {
println!("> Config parse error: {:?}", err); error!("{:?}", err);
return; return;
} }
}; };
@@ -198,11 +195,11 @@ async fn main() {
// Setup shut down sequence // Setup shut down sequence
match signal::ctrl_c().await { match signal::ctrl_c().await {
Ok(()) => { Ok(()) => {
println!("> Shutting down..."); info!("Shutting down...");
} }
Err(err) => { Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err); error!("Unable to listen for shutdown signal: {}", err);
} }
}; };
} }

View File

@@ -3,6 +3,7 @@ use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection}; use bb8::{ManageConnection, Pool, PooledConnection};
use bytes::BytesMut; use bytes::BytesMut;
use chrono::naive::NaiveDateTime; use chrono::naive::NaiveDateTime;
use log::{error, info, warn};
use crate::config::{get_config, Address, Role, User}; use crate::config::{get_config, Address, Role, User};
use crate::errors::Error; use crate::errors::Error;
@@ -54,7 +55,7 @@ impl ConnectionPool {
"primary" => Role::Primary, "primary" => Role::Primary,
"replica" => Role::Replica, "replica" => Role::Replica,
_ => { _ => {
println!("> Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2); error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2);
Role::Replica Role::Replica
} }
}; };
@@ -118,7 +119,7 @@ impl ConnectionPool {
let connection = match self.get(shard, None).await { let connection = match self.get(shard, None).await {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
println!("> Shard {} down or misconfigured: {:?}", shard, err); error!("Shard {} down or misconfigured: {:?}", shard, err);
continue; continue;
} }
}; };
@@ -166,8 +167,8 @@ impl ConnectionPool {
}; };
if !exists { if !exists {
log::error!( error!(
"ConnectionPool::get Requested role {:?}, but none is configured.", "Requested role {:?}, but none are configured",
role role
); );
return Err(Error::BadConfig); return Err(Error::BadConfig);
@@ -198,7 +199,7 @@ impl ConnectionPool {
let mut conn = match self.databases[shard][index].get().await { let mut conn = match self.databases[shard][index].get().await {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
println!(">> Banning replica {}, error: {:?}", index, err); error!("Banning replica {}, error: {:?}", index, err);
self.ban(address, shard); self.ban(address, shard);
continue; continue;
} }
@@ -208,6 +209,8 @@ impl ConnectionPool {
let server = &mut *conn; let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout; let healthcheck_timeout = get_config().general.healthcheck_timeout;
self.stats.server_tested(server.process_id());
match tokio::time::timeout( match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout), tokio::time::Duration::from_millis(healthcheck_timeout),
server.query("SELECT 1"), server.query("SELECT 1"),
@@ -218,13 +221,11 @@ impl ConnectionPool {
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats.checkout_time(now.elapsed().as_micros()); self.stats.checkout_time(now.elapsed().as_micros());
self.stats.server_idle(conn.process_id());
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
Err(_) => { Err(_) => {
println!( error!("Banning replica {} because of failed health check", index);
">> Banning replica {} because of failed health check",
index
);
// Don't leave a bad connection in the pool. // Don't leave a bad connection in the pool.
server.mark_bad(); server.mark_bad();
@@ -234,10 +235,7 @@ impl ConnectionPool {
}, },
// Health check never came back, database is really really down // Health check never came back, database is really really down
Err(_) => { Err(_) => {
println!( error!("Banning replica {} because of health check timeout", index);
">> Banning replica {} because of health check timeout",
index
);
// Don't leave a bad connection in the pool. // Don't leave a bad connection in the pool.
server.mark_bad(); server.mark_bad();
@@ -254,7 +252,7 @@ impl ConnectionPool {
/// traffic for any new transactions. Existing transactions on that replica /// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients. /// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address, shard: usize) { pub fn ban(&self, address: &Address, shard: usize) {
println!(">> Banning {:?}", address); error!("Banning {:?}", address);
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.lock().unwrap();
guard[shard].insert(address.clone(), now); guard[shard].insert(address.clone(), now);
@@ -287,7 +285,7 @@ impl ConnectionPool {
if guard[shard].len() == replicas_available { if guard[shard].len() == replicas_available {
guard[shard].clear(); guard[shard].clear();
drop(guard); drop(guard);
println!(">> Unbanning all replicas."); warn!("Unbanning all replicas.");
return false; return false;
} }
@@ -351,7 +349,10 @@ impl ManageConnection for ServerPool {
/// Attempts to create a new connection. /// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> { async fn connect(&self) -> Result<Self::Connection, Self::Error> {
println!(">> Creating a new connection for the pool"); info!(
"Creating a new connection to {:?} using user {:?}",
self.address, self.user.name
);
Server::startup( Server::startup(
&self.address, &self.address,

View File

@@ -8,6 +8,7 @@ use regex::RegexSet;
use sqlparser::ast::Statement::{Query, StartTransaction}; use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::dialect::PostgreSqlDialect; use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
use log::{error, debug};
const CUSTOM_SQL_REGEXES: [&str; 5] = [ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)SET SHARDING KEY TO '[0-9]+'", r"(?i)SET SHARDING KEY TO '[0-9]+'",
@@ -54,7 +55,7 @@ impl QueryRouter {
let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) { let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
Ok(rgx) => rgx, Ok(rgx) => rgx,
Err(err) => { Err(err) => {
log::error!("QueryRouter::setup Could not compile regex set: {:?}", err); error!("QueryRouter::setup Could not compile regex set: {:?}", err);
return false; return false;
} }
}; };
@@ -219,8 +220,8 @@ impl QueryRouter {
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast, Ok(ast) => ast,
Err(err) => { Err(err) => {
log::debug!( debug!(
"QueryParser::infer_role could not parse query, error: {:?}, query: {}", "{:?}, query: {}",
err, err,
query query
); );

View File

@@ -1,6 +1,7 @@
use bytes::{Buf, BufMut, BytesMut};
///! Implementation of the PostgreSQL server (database) protocol. ///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client. ///! Here we are pretending to the a Postgres client.
use bytes::{Buf, BufMut, BytesMut}; use log::{error, info};
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -33,7 +34,7 @@ pub struct Server {
server_info: BytesMut, server_info: BytesMut,
// Backend id and secret key used for query cancellation. // Backend id and secret key used for query cancellation.
backend_id: i32, process_id: i32,
secret_key: i32, secret_key: i32,
// Is the server inside a transaction or idle. // Is the server inside a transaction or idle.
@@ -69,7 +70,7 @@ impl Server {
match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await { match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
println!(">> Could not connect to server: {}", err); error!("Could not connect to server: {}", err);
return Err(Error::SocketError); return Err(Error::SocketError);
} }
}; };
@@ -78,7 +79,7 @@ impl Server {
startup(&mut stream, &user.name, database).await?; startup(&mut stream, &user.name, database).await?;
let mut server_info = BytesMut::new(); let mut server_info = BytesMut::new();
let mut backend_id: i32 = 0; let mut process_id: i32 = 0;
let mut secret_key: i32 = 0; let mut secret_key: i32 = 0;
// We'll be handling multiple packets, but they will all be structured the same. // We'll be handling multiple packets, but they will all be structured the same.
@@ -121,7 +122,7 @@ impl Server {
AUTHENTICATION_SUCCESSFUL => (), AUTHENTICATION_SUCCESSFUL => (),
_ => { _ => {
println!(">> Unsupported authentication mechanism: {}", auth_code); error!("Unsupported authentication mechanism: {}", auth_code);
return Err(Error::ServerError); return Err(Error::ServerError);
} }
} }
@@ -151,7 +152,7 @@ impl Server {
// TODO: the error message contains multiple fields; we can decode them and // TODO: the error message contains multiple fields; we can decode them and
// present a prettier message to the user. // present a prettier message to the user.
// See: https://www.postgresql.org/docs/12/protocol-error-fields.html // See: https://www.postgresql.org/docs/12/protocol-error-fields.html
println!(">> Server error: {}", String::from_utf8_lossy(&error)); error!("Server error: {}", String::from_utf8_lossy(&error));
} }
}; };
@@ -179,7 +180,7 @@ impl Server {
'K' => { 'K' => {
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later. // The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html // See: https://www.postgresql.org/docs/12/protocol-message-formats.html
backend_id = match stream.read_i32().await { process_id = match stream.read_i32().await {
Ok(id) => id, Ok(id) => id,
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
}; };
@@ -209,7 +210,7 @@ impl Server {
write: write, write: write,
buffer: BytesMut::with_capacity(8196), buffer: BytesMut::with_capacity(8196),
server_info: server_info, server_info: server_info,
backend_id: backend_id, process_id: process_id,
secret_key: secret_key, secret_key: secret_key,
in_transaction: false, in_transaction: false,
data_available: false, data_available: false,
@@ -223,7 +224,7 @@ impl Server {
// We have an unexpected message from the server during this exchange. // We have an unexpected message from the server during this exchange.
// Means we implemented the protocol wrong or we're not talking to a Postgres server. // Means we implemented the protocol wrong or we're not talking to a Postgres server.
_ => { _ => {
println!(">> Unknown code: {}", code); error!("Unknown code: {}", code);
return Err(Error::ProtocolSyncError); return Err(Error::ProtocolSyncError);
} }
}; };
@@ -241,7 +242,7 @@ impl Server {
let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await { let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(err) => { Err(err) => {
println!(">> Could not connect to server: {}", err); error!("Could not connect to server: {}", err);
return Err(Error::SocketError); return Err(Error::SocketError);
} }
}; };
@@ -262,7 +263,7 @@ impl Server {
match write_all_half(&mut self.write, messages).await { match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
println!(">> Terminating server because of: {:?}", err); error!("Terminating server because of: {:?}", err);
self.bad = true; self.bad = true;
Err(err) Err(err)
} }
@@ -277,7 +278,7 @@ impl Server {
let mut message = match read_message(&mut self.read).await { let mut message = match read_message(&mut self.read).await {
Ok(message) => message, Ok(message) => message,
Err(err) => { Err(err) => {
println!(">> Terminating server because of: {:?}", err); error!("Terminating server because of: {:?}", err);
self.bad = true; self.bad = true;
return Err(err); return Err(err);
} }
@@ -396,7 +397,7 @@ impl Server {
/// Indicate that this server connection cannot be re-used and must be discarded. /// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self) { pub fn mark_bad(&mut self) {
println!(">> Server marked bad"); error!("Server marked bad");
self.bad = true; self.bad = true;
} }
@@ -406,7 +407,7 @@ impl Server {
guard.insert( guard.insert(
(process_id, secret_key), (process_id, secret_key),
( (
self.backend_id, self.process_id,
self.secret_key, self.secret_key,
self.address.host.clone(), self.address.host.clone(),
self.address.port.clone(), self.address.port.clone(),
@@ -455,6 +456,10 @@ impl Server {
pub fn address(&self) -> Address { pub fn address(&self) -> Address {
self.address.clone() self.address.clone()
} }
pub fn process_id(&self) -> i32 {
self.process_id
}
} }
impl Drop for Server { impl Drop for Server {
@@ -462,6 +467,8 @@ impl Drop for Server {
/// the socket is in non-blocking mode, so it may not be ready /// the socket is in non-blocking mode, so it may not be ready
/// for a write. /// for a write.
fn drop(&mut self) { fn drop(&mut self) {
self.stats.server_disconnecting(self.process_id());
let mut bytes = BytesMut::with_capacity(4); let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X'); bytes.put_u8(b'X');
bytes.put_i32(4); bytes.put_i32(4);
@@ -476,8 +483,8 @@ impl Drop for Server {
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
let duration = now - self.connected_at; let duration = now - self.connected_at;
println!( info!(
">> Server connection closed, session duration: {}", "Server connection closed, session duration: {}",
crate::format_duration(&duration) crate::format_duration(&duration)
); );
} }

View File

@@ -1,3 +1,4 @@
use log::info;
use statsd::Client; use statsd::Client;
/// Statistics collector and publisher. /// Statistics collector and publisher.
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
@@ -20,6 +21,11 @@ pub enum StatisticName {
ClientActive, ClientActive,
ClientIdle, ClientIdle,
ClientDisconnecting, ClientDisconnecting,
ServerActive,
ServerIdle,
ServerTested,
ServerLogin,
ServerDisconnecting,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -100,7 +106,6 @@ impl Reporter {
} }
pub fn client_active(&mut self, process_id: i32) { pub fn client_active(&mut self, process_id: i32) {
let statistic = Statistic { let statistic = Statistic {
name: StatisticName::ClientActive, name: StatisticName::ClientActive,
value: 1, value: 1,
@@ -129,6 +134,56 @@ impl Reporter {
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(statistic);
} }
pub fn server_active(&mut self, process_id: i32) {
let statistic = Statistic {
name: StatisticName::ServerActive,
value: 1,
process_id: Some(process_id),
};
let _ = self.tx.try_send(statistic);
}
pub fn server_idle(&mut self, process_id: i32) {
let statistic = Statistic {
name: StatisticName::ServerIdle,
value: 1,
process_id: Some(process_id),
};
let _ = self.tx.try_send(statistic);
}
pub fn server_login(&mut self, process_id: i32) {
let statistic = Statistic {
name: StatisticName::ServerLogin,
value: 1,
process_id: Some(process_id),
};
let _ = self.tx.try_send(statistic);
}
pub fn server_tested(&mut self, process_id: i32) {
let statistic = Statistic {
name: StatisticName::ServerTested,
value: 1,
process_id: Some(process_id),
};
let _ = self.tx.try_send(statistic);
}
pub fn server_disconnecting(&mut self, process_id: i32) {
let statistic = Statistic {
name: StatisticName::ServerDisconnecting,
value: 1,
process_id: Some(process_id),
};
let _ = self.tx.try_send(statistic);
}
} }
pub struct Collector { pub struct Collector {
@@ -145,6 +200,8 @@ impl Collector {
} }
pub async fn collect(&mut self) { pub async fn collect(&mut self) {
info!("Statistics reporter started");
let mut stats = HashMap::from([ let mut stats = HashMap::from([
("total_query_count", 0), ("total_query_count", 0),
("total_xact_count", 0), ("total_xact_count", 0),
@@ -156,9 +213,13 @@ impl Collector {
("cl_waiting", 0), ("cl_waiting", 0),
("cl_active", 0), ("cl_active", 0),
("cl_idle", 0), ("cl_idle", 0),
("sv_idle", 0),
("sv_active", 0),
("sv_login", 0),
("sv_tested", 0),
]); ]);
let mut client_states: HashMap<i32, StatisticName> = HashMap::new(); let mut client_server_states: HashMap<i32, StatisticName> = HashMap::new();
let mut now = Instant::now(); let mut now = Instant::now();
@@ -166,7 +227,7 @@ impl Collector {
let stat = match self.rx.recv().await { let stat = match self.rx.recv().await {
Some(stat) => stat, Some(stat) => stat,
None => { None => {
println!(">> Statistics collector is shutting down."); info!("Statistics collector is shutting down");
return; return;
} }
}; };
@@ -212,20 +273,25 @@ impl Collector {
} }
} }
StatisticName::ClientActive | StatisticName::ClientWaiting | StatisticName::ClientIdle => { StatisticName::ClientActive
client_states.insert(stat.process_id.unwrap(), stat.name); | StatisticName::ClientWaiting
| StatisticName::ClientIdle
| StatisticName::ServerActive
| StatisticName::ServerIdle
| StatisticName::ServerTested
| StatisticName::ServerLogin => {
client_server_states.insert(stat.process_id.unwrap(), stat.name);
} }
StatisticName::ClientDisconnecting => { StatisticName::ClientDisconnecting | StatisticName::ServerDisconnecting => {
client_states.remove(&stat.process_id.unwrap()); client_server_states.remove(&stat.process_id.unwrap());
} }
}; };
// It's been 15 seconds. If there is no traffic, it won't publish anything, // It's been 15 seconds. If there is no traffic, it won't publish anything,
// but it also doesn't matter then. // but it also doesn't matter then.
if now.elapsed().as_secs() > 15 { if now.elapsed().as_secs() > 15 {
for (_, state) in &client_states { for (_, state) in &client_server_states {
match state { match state {
StatisticName::ClientActive => { StatisticName::ClientActive => {
let counter = stats.entry("cl_active").or_insert(0); let counter = stats.entry("cl_active").or_insert(0);
@@ -242,11 +308,31 @@ impl Collector {
*counter += 1; *counter += 1;
} }
StatisticName::ServerIdle => {
let counter = stats.entry("sv_idle").or_insert(0);
*counter += 1;
}
StatisticName::ServerActive => {
let counter = stats.entry("sv_active").or_insert(0);
*counter += 1;
}
StatisticName::ServerTested => {
let counter = stats.entry("sv_tested").or_insert(0);
*counter += 1;
}
StatisticName::ServerLogin => {
let counter = stats.entry("sv_login").or_insert(0);
*counter += 1;
}
_ => unreachable!(), _ => unreachable!(),
}; };
} }
println!(">> Reporting to StatsD: {:?}", stats); info!("{:?}", stats);
let mut pipeline = self.client.pipeline(); let mut pipeline = self.client.pipeline();