mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
Add server login stat; refactor for better naming (#34)
This commit is contained in:
19
src/pool.rs
19
src/pool.rs
@@ -351,7 +351,12 @@ impl ManageConnection for ServerPool {
|
|||||||
self.address, self.user.name
|
self.address, self.user.name
|
||||||
);
|
);
|
||||||
|
|
||||||
Server::startup(
|
// Put a temporary process_id into the stats
|
||||||
|
// for server login.
|
||||||
|
let process_id = rand::random::<i32>();
|
||||||
|
self.stats.server_login(process_id);
|
||||||
|
|
||||||
|
match Server::startup(
|
||||||
&self.address,
|
&self.address,
|
||||||
&self.user,
|
&self.user,
|
||||||
&self.database,
|
&self.database,
|
||||||
@@ -359,6 +364,18 @@ impl ManageConnection for ServerPool {
|
|||||||
self.stats.clone(),
|
self.stats.clone(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
{
|
||||||
|
Ok(conn) => {
|
||||||
|
// Remove the temporary process_id from the stats.
|
||||||
|
self.stats.server_disconnecting(process_id);
|
||||||
|
Ok(conn)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
// Remove the temporary process_id from the stats.
|
||||||
|
self.stats.server_disconnecting(process_id);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Determines if the connection is still connected to the database.
|
/// Determines if the connection is still connected to the database.
|
||||||
|
|||||||
156
src/stats.rs
156
src/stats.rs
@@ -1,6 +1,6 @@
|
|||||||
use log::info;
|
use log::info;
|
||||||
use statsd::Client;
|
use statsd::Client;
|
||||||
/// Statistics collector and publisher.
|
/// Events collector and publisher.
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -9,12 +9,10 @@ use std::time::Instant;
|
|||||||
use crate::config::get_config;
|
use crate::config::get_config;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub enum StatisticName {
|
enum EventName {
|
||||||
CheckoutTime,
|
CheckoutTime,
|
||||||
//QueryRuntime,
|
Query,
|
||||||
//TransactionTime,
|
Transaction,
|
||||||
Queries,
|
|
||||||
Transactions,
|
|
||||||
DataSent,
|
DataSent,
|
||||||
DataReceived,
|
DataReceived,
|
||||||
ClientWaiting,
|
ClientWaiting,
|
||||||
@@ -29,25 +27,25 @@ pub enum StatisticName {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Statistic {
|
pub struct Event {
|
||||||
pub name: StatisticName,
|
name: EventName,
|
||||||
pub value: i64,
|
value: i64,
|
||||||
pub process_id: Option<i32>,
|
process_id: Option<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Reporter {
|
pub struct Reporter {
|
||||||
tx: Sender<Statistic>,
|
tx: Sender<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Reporter {
|
impl Reporter {
|
||||||
pub fn new(tx: Sender<Statistic>) -> Reporter {
|
pub fn new(tx: Sender<Event>) -> Reporter {
|
||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query(&mut self) {
|
pub fn query(&self) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::Queries,
|
name: EventName::Query,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: None,
|
process_id: None,
|
||||||
};
|
};
|
||||||
@@ -55,9 +53,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transaction(&mut self) {
|
pub fn transaction(&self) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::Transactions,
|
name: EventName::Transaction,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: None,
|
process_id: None,
|
||||||
};
|
};
|
||||||
@@ -65,9 +63,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_sent(&mut self, amount: usize) {
|
pub fn data_sent(&self, amount: usize) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::DataSent,
|
name: EventName::DataSent,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: None,
|
process_id: None,
|
||||||
};
|
};
|
||||||
@@ -75,9 +73,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_received(&mut self, amount: usize) {
|
pub fn data_received(&self, amount: usize) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::DataReceived,
|
name: EventName::DataReceived,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: None,
|
process_id: None,
|
||||||
};
|
};
|
||||||
@@ -85,9 +83,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn checkout_time(&mut self, ms: u128) {
|
pub fn checkout_time(&self, ms: u128) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::CheckoutTime,
|
name: EventName::CheckoutTime,
|
||||||
value: ms as i64,
|
value: ms as i64,
|
||||||
process_id: None,
|
process_id: None,
|
||||||
};
|
};
|
||||||
@@ -95,9 +93,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_waiting(&mut self, process_id: i32) {
|
pub fn client_waiting(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ClientWaiting,
|
name: EventName::ClientWaiting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -105,9 +103,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_active(&mut self, process_id: i32) {
|
pub fn client_active(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ClientActive,
|
name: EventName::ClientActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -115,9 +113,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_idle(&mut self, process_id: i32) {
|
pub fn client_idle(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ClientIdle,
|
name: EventName::ClientIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -125,9 +123,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_disconnecting(&mut self, process_id: i32) {
|
pub fn client_disconnecting(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ClientDisconnecting,
|
name: EventName::ClientDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -135,9 +133,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_active(&mut self, process_id: i32) {
|
pub fn server_active(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ServerActive,
|
name: EventName::ServerActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -145,9 +143,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_idle(&mut self, process_id: i32) {
|
pub fn server_idle(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ServerIdle,
|
name: EventName::ServerIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -155,9 +153,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_login(&mut self, process_id: i32) {
|
pub fn server_login(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ServerLogin,
|
name: EventName::ServerLogin,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -165,9 +163,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_tested(&mut self, process_id: i32) {
|
pub fn server_tested(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ServerTested,
|
name: EventName::ServerTested,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -175,9 +173,9 @@ impl Reporter {
|
|||||||
let _ = self.tx.try_send(statistic);
|
let _ = self.tx.try_send(statistic);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_disconnecting(&mut self, process_id: i32) {
|
pub fn server_disconnecting(&self, process_id: i32) {
|
||||||
let statistic = Statistic {
|
let statistic = Event {
|
||||||
name: StatisticName::ServerDisconnecting,
|
name: EventName::ServerDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: Some(process_id),
|
||||||
};
|
};
|
||||||
@@ -187,12 +185,12 @@ impl Reporter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct Collector {
|
pub struct Collector {
|
||||||
rx: Receiver<Statistic>,
|
rx: Receiver<Event>,
|
||||||
client: Client,
|
client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Collector {
|
impl Collector {
|
||||||
pub fn new(rx: Receiver<Statistic>) -> Collector {
|
pub fn new(rx: Receiver<Event>) -> Collector {
|
||||||
Collector {
|
Collector {
|
||||||
rx: rx,
|
rx: rx,
|
||||||
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
||||||
@@ -200,7 +198,7 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn collect(&mut self) {
|
pub async fn collect(&mut self) {
|
||||||
info!("Statistics reporter started");
|
info!("Events reporter started");
|
||||||
|
|
||||||
let mut stats = HashMap::from([
|
let mut stats = HashMap::from([
|
||||||
("total_query_count", 0),
|
("total_query_count", 0),
|
||||||
@@ -219,7 +217,7 @@ impl Collector {
|
|||||||
("sv_tested", 0),
|
("sv_tested", 0),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let mut client_server_states: HashMap<i32, StatisticName> = HashMap::new();
|
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
|
||||||
|
|
||||||
let mut now = Instant::now();
|
let mut now = Instant::now();
|
||||||
|
|
||||||
@@ -227,34 +225,34 @@ impl Collector {
|
|||||||
let stat = match self.rx.recv().await {
|
let stat = match self.rx.recv().await {
|
||||||
Some(stat) => stat,
|
Some(stat) => stat,
|
||||||
None => {
|
None => {
|
||||||
info!("Statistics collector is shutting down");
|
info!("Events collector is shutting down");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
StatisticName::Queries => {
|
EventName::Query => {
|
||||||
let counter = stats.entry("total_query_count").or_insert(0);
|
let counter = stats.entry("total_query_count").or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::Transactions => {
|
EventName::Transaction => {
|
||||||
let counter = stats.entry("total_xact_count").or_insert(0);
|
let counter = stats.entry("total_xact_count").or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::DataSent => {
|
EventName::DataSent => {
|
||||||
let counter = stats.entry("total_sent").or_insert(0);
|
let counter = stats.entry("total_sent").or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::DataReceived => {
|
EventName::DataReceived => {
|
||||||
let counter = stats.entry("total_received").or_insert(0);
|
let counter = stats.entry("total_received").or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::CheckoutTime => {
|
EventName::CheckoutTime => {
|
||||||
let counter = stats.entry("total_wait_time").or_insert(0);
|
let counter = stats.entry("total_wait_time").or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
|
|
||||||
@@ -273,17 +271,17 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ClientActive
|
EventName::ClientActive
|
||||||
| StatisticName::ClientWaiting
|
| EventName::ClientWaiting
|
||||||
| StatisticName::ClientIdle
|
| EventName::ClientIdle
|
||||||
| StatisticName::ServerActive
|
| EventName::ServerActive
|
||||||
| StatisticName::ServerIdle
|
| EventName::ServerIdle
|
||||||
| StatisticName::ServerTested
|
| EventName::ServerTested
|
||||||
| StatisticName::ServerLogin => {
|
| EventName::ServerLogin => {
|
||||||
client_server_states.insert(stat.process_id.unwrap(), stat.name);
|
client_server_states.insert(stat.process_id.unwrap(), stat.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ClientDisconnecting | StatisticName::ServerDisconnecting => {
|
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
||||||
client_server_states.remove(&stat.process_id.unwrap());
|
client_server_states.remove(&stat.process_id.unwrap());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -293,37 +291,37 @@ impl Collector {
|
|||||||
if now.elapsed().as_secs() > 15 {
|
if now.elapsed().as_secs() > 15 {
|
||||||
for (_, state) in &client_server_states {
|
for (_, state) in &client_server_states {
|
||||||
match state {
|
match state {
|
||||||
StatisticName::ClientActive => {
|
EventName::ClientActive => {
|
||||||
let counter = stats.entry("cl_active").or_insert(0);
|
let counter = stats.entry("cl_active").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ClientWaiting => {
|
EventName::ClientWaiting => {
|
||||||
let counter = stats.entry("cl_waiting").or_insert(0);
|
let counter = stats.entry("cl_waiting").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ClientIdle => {
|
EventName::ClientIdle => {
|
||||||
let counter = stats.entry("cl_idle").or_insert(0);
|
let counter = stats.entry("cl_idle").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ServerIdle => {
|
EventName::ServerIdle => {
|
||||||
let counter = stats.entry("sv_idle").or_insert(0);
|
let counter = stats.entry("sv_idle").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ServerActive => {
|
EventName::ServerActive => {
|
||||||
let counter = stats.entry("sv_active").or_insert(0);
|
let counter = stats.entry("sv_active").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ServerTested => {
|
EventName::ServerTested => {
|
||||||
let counter = stats.entry("sv_tested").or_insert(0);
|
let counter = stats.entry("sv_tested").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StatisticName::ServerLogin => {
|
EventName::ServerLogin => {
|
||||||
let counter = stats.entry("sv_login").or_insert(0);
|
let counter = stats.entry("sv_login").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user