mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 10:26:30 +00:00
@@ -133,7 +133,7 @@ async fn main() {
|
|||||||
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
// Statistics reporting.
|
// Statistics reporting.
|
||||||
let (tx, rx) = mpsc::channel(100);
|
let (tx, rx) = mpsc::channel(100_000);
|
||||||
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
||||||
|
|
||||||
// Connection pool that allows to query all shards and replicas.
|
// Connection pool that allows to query all shards and replicas.
|
||||||
|
|||||||
52
src/stats.rs
52
src/stats.rs
@@ -1,9 +1,10 @@
|
|||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
/// Statistics and reporting.
|
/// Statistics and reporting.
|
||||||
use log::info;
|
use log::{error, info, trace};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
|
||||||
use crate::pool::get_number_of_addresses;
|
use crate::pool::get_number_of_addresses;
|
||||||
@@ -43,7 +44,7 @@ enum EventName {
|
|||||||
|
|
||||||
/// Event data sent to the collector
|
/// Event data sent to the collector
|
||||||
/// from clients and servers.
|
/// from clients and servers.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
/// The name of the event being reported.
|
/// The name of the event being reported.
|
||||||
name: EventName,
|
name: EventName,
|
||||||
@@ -79,6 +80,25 @@ impl Reporter {
|
|||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send statistics to the task keeping track of stats.
|
||||||
|
fn send(&self, event: Event) {
|
||||||
|
let name = event.name;
|
||||||
|
let result = self.tx.try_send(event);
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => trace!(
|
||||||
|
"{:?} event reported successfully, capacity: {}",
|
||||||
|
name,
|
||||||
|
self.tx.capacity()
|
||||||
|
),
|
||||||
|
|
||||||
|
Err(err) => match err {
|
||||||
|
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||||
|
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Report a query executed by a client against
|
/// Report a query executed by a client against
|
||||||
/// a server identified by the `address_id`.
|
/// a server identified by the `address_id`.
|
||||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||||
@@ -89,7 +109,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client against
|
/// Report a transaction executed by a client against
|
||||||
@@ -102,7 +122,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server identified by `address_id`.
|
/// Report data sent to a server identified by `address_id`.
|
||||||
@@ -115,7 +135,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server identified by `address_id`.
|
/// Report data received from a server identified by `address_id`.
|
||||||
@@ -128,7 +148,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Time spent waiting to get a healthy connection from the pool
|
/// Time spent waiting to get a healthy connection from the pool
|
||||||
@@ -142,7 +162,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` waiting for a connection
|
/// Reports a client identified by `process_id` waiting for a connection
|
||||||
@@ -155,7 +175,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||||
@@ -168,7 +188,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done querying the server
|
/// Reports a client identified by `process_id` is done querying the server
|
||||||
@@ -181,7 +201,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -194,7 +214,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -208,7 +228,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -222,7 +242,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -236,7 +256,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -250,7 +270,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -263,7 +283,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
self.send(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user