mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Add SHOW CLIENTS / SHOW SERVERS + Stats refactor and tests (#159)
* wip * Main Thread Panic when swarmed with clients * fix * fix * 1024 * fix * remove test * Add SHOW CLIENTS * revert * fmt * Refactor + tests * fmt * add test * Add SHOW SERVERS + Make PR unreviewable * prometheus * add state to clients and servers * fmt * Add application_name to server stats * Add tests for waiting clients * Docs * remove comment * comments * typo * cleanup * CI
This commit is contained in:
committed by
GitHub
parent
075167431d
commit
4ae1bc8d32
197
src/admin.rs
197
src/admin.rs
@@ -2,12 +2,15 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{info, trace};
|
||||
use std::collections::HashMap;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::config::{get_config, reload_config, VERSION};
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::get_all_pools;
|
||||
use crate::stats::get_stats;
|
||||
use crate::stats::{
|
||||
get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState,
|
||||
};
|
||||
use crate::ClientServerMap;
|
||||
|
||||
pub fn generate_server_info_for_admin() -> BytesMut {
|
||||
@@ -72,6 +75,14 @@ where
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream).await
|
||||
}
|
||||
"CLIENTS" => {
|
||||
trace!("SHOW CLIENTS");
|
||||
show_clients(stream).await
|
||||
}
|
||||
"SERVERS" => {
|
||||
trace!("SHOW SERVERS");
|
||||
show_servers(stream).await
|
||||
}
|
||||
"STATS" => {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream).await
|
||||
@@ -91,7 +102,8 @@ async fn show_lists<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
let client_stats = get_client_stats();
|
||||
let server_stats = get_server_stats();
|
||||
|
||||
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||
|
||||
@@ -111,18 +123,18 @@ where
|
||||
res.put(data_row(&vec!["pools".to_string(), databases.to_string()]));
|
||||
res.put(data_row(&vec![
|
||||
"free_clients".to_string(),
|
||||
stats
|
||||
client_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_idle"])
|
||||
.sum::<i64>()
|
||||
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_clients".to_string(),
|
||||
stats
|
||||
client_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_active"])
|
||||
.sum::<i64>()
|
||||
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
@@ -131,18 +143,18 @@ where
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"free_servers".to_string(),
|
||||
stats
|
||||
server_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_idle"])
|
||||
.sum::<i64>()
|
||||
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_servers".to_string(),
|
||||
stats
|
||||
server_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_active"])
|
||||
.sum::<i64>()
|
||||
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||
@@ -182,11 +194,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
let all_pool_stats = get_pool_stats();
|
||||
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("pool_mode", DataType::Text),
|
||||
("cl_idle", DataType::Numeric),
|
||||
("cl_active", DataType::Numeric),
|
||||
("cl_waiting", DataType::Numeric),
|
||||
@@ -198,32 +211,27 @@ where
|
||||
("sv_login", DataType::Numeric),
|
||||
("maxwait", DataType::Numeric),
|
||||
("maxwait_us", DataType::Numeric),
|
||||
("pool_mode", DataType::Text),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
for (_, pool) in get_all_pools() {
|
||||
for ((pool_name, username), pool) in get_all_pools() {
|
||||
let def = HashMap::default();
|
||||
let pool_stats = all_pool_stats
|
||||
.get(&(pool_name.clone(), username.clone()))
|
||||
.unwrap_or(&def);
|
||||
|
||||
let pool_config = &pool.settings;
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name(), pool_config.user.username.clone()];
|
||||
|
||||
for column in &columns[2..columns.len() - 1] {
|
||||
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||
row.push(value);
|
||||
}
|
||||
|
||||
row.push(pool_config.pool_mode.to_string());
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
let mut row = vec![
|
||||
pool_name.clone(),
|
||||
username.clone(),
|
||||
pool_config.pool_mode.to_string(),
|
||||
];
|
||||
for column in &columns[3..columns.len()] {
|
||||
let value = pool_stats.get(column.0).unwrap_or(&0).to_string();
|
||||
row.push(value);
|
||||
}
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
@@ -387,6 +395,7 @@ where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("instance", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("total_xact_count", DataType::Numeric),
|
||||
@@ -396,32 +405,32 @@ where
|
||||
("total_xact_time", DataType::Numeric),
|
||||
("total_query_time", DataType::Numeric),
|
||||
("total_wait_time", DataType::Numeric),
|
||||
("total_errors", DataType::Numeric),
|
||||
("avg_xact_count", DataType::Numeric),
|
||||
("avg_query_count", DataType::Numeric),
|
||||
("avg_recv", DataType::Numeric),
|
||||
("avg_sent", DataType::Numeric),
|
||||
("avg_errors", DataType::Numeric),
|
||||
("avg_xact_time", DataType::Numeric),
|
||||
("avg_query_time", DataType::Numeric),
|
||||
("avg_wait_time", DataType::Numeric),
|
||||
];
|
||||
|
||||
let stats = get_stats();
|
||||
let all_stats = get_address_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for ((_db_name, username), pool) in get_all_pools() {
|
||||
for ((db, username), pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
let stats = match all_stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name()];
|
||||
row.push(username.clone());
|
||||
|
||||
for column in &columns[2..] {
|
||||
let mut row = vec![address.name(), db.clone(), username.clone()];
|
||||
for column in &columns[3..] {
|
||||
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||
}
|
||||
|
||||
@@ -439,3 +448,107 @@ where
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Show currently connected clients
|
||||
async fn show_clients<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("client_id", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("application_name", DataType::Text),
|
||||
("state", DataType::Text),
|
||||
("transaction_count", DataType::Numeric),
|
||||
("query_count", DataType::Numeric),
|
||||
("error_count", DataType::Numeric),
|
||||
("age_seconds", DataType::Numeric),
|
||||
];
|
||||
|
||||
let new_map = get_client_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for (_, client) in new_map {
|
||||
let row = vec![
|
||||
format!("{:#010X}", client.client_id),
|
||||
client.pool_name,
|
||||
client.username,
|
||||
client.application_name.clone(),
|
||||
client.state.to_string(),
|
||||
client.transaction_count.to_string(),
|
||||
client.query_count.to_string(),
|
||||
client.error_count.to_string(),
|
||||
Instant::now()
|
||||
.duration_since(client.connect_time)
|
||||
.as_secs()
|
||||
.to_string(),
|
||||
];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Show currently connected servers
|
||||
async fn show_servers<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("server_id", DataType::Text),
|
||||
("database_name", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("address_id", DataType::Text),
|
||||
("application_name", DataType::Text),
|
||||
("state", DataType::Text),
|
||||
("transaction_count", DataType::Numeric),
|
||||
("query_count", DataType::Numeric),
|
||||
("bytes_sent", DataType::Numeric),
|
||||
("bytes_received", DataType::Numeric),
|
||||
("age_seconds", DataType::Numeric),
|
||||
];
|
||||
|
||||
let new_map = get_server_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for (_, server) in new_map {
|
||||
let row = vec![
|
||||
format!("{:#010X}", server.server_id),
|
||||
server.pool_name,
|
||||
server.username,
|
||||
server.address_name,
|
||||
server.application_name,
|
||||
server.state.to_string(),
|
||||
server.transaction_count.to_string(),
|
||||
server.query_count.to_string(),
|
||||
server.bytes_sent.to_string(),
|
||||
server.bytes_received.to_string(),
|
||||
Instant::now()
|
||||
.duration_since(server.connect_time)
|
||||
.as_secs()
|
||||
.to_string(),
|
||||
];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
@@ -577,6 +577,12 @@ where
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
self.stats.client_register(
|
||||
self.process_id,
|
||||
self.pool_name.clone(),
|
||||
self.username.clone(),
|
||||
self.application_name.clone(),
|
||||
);
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
@@ -764,15 +770,12 @@ where
|
||||
server.claim(self.process_id, self.secret_key);
|
||||
self.connected_to_server = true;
|
||||
|
||||
// Update statistics.
|
||||
if let Some(last_address_id) = self.last_address_id {
|
||||
self.stats
|
||||
.client_disconnecting(self.process_id, last_address_id);
|
||||
}
|
||||
self.stats.client_active(self.process_id, address.id);
|
||||
// Update statistics
|
||||
self.stats
|
||||
.client_active(self.process_id, server.server_id());
|
||||
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.process_id());
|
||||
self.last_server_id = Some(server.server_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
@@ -830,7 +833,7 @@ where
|
||||
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -908,7 +911,7 @@ where
|
||||
self.buffer.clear();
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -943,7 +946,7 @@ where
|
||||
};
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -964,11 +967,11 @@ where
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
server.checkin_cleanup().await?;
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
self.stats.server_idle(server.server_id());
|
||||
self.connected_to_server = false;
|
||||
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id, address.id);
|
||||
self.stats.client_idle(self.process_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1010,7 +1013,7 @@ where
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query(self.process_id, address.id);
|
||||
self.stats.query(self.process_id, server.server_id());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1091,14 +1094,9 @@ impl<S, T> Drop for Client<S, T> {
|
||||
|
||||
// Dirty shutdown
|
||||
// TODO: refactor, this is not the best way to handle state management.
|
||||
if let Some(address_id) = self.last_address_id {
|
||||
self.stats.client_disconnecting(self.process_id, address_id);
|
||||
|
||||
if self.connected_to_server {
|
||||
if let Some(process_id) = self.last_server_id {
|
||||
self.stats.server_idle(process_id, address_id);
|
||||
}
|
||||
}
|
||||
self.stats.client_disconnecting(self.process_id);
|
||||
if self.connected_to_server && self.last_server_id.is_some() {
|
||||
self.stats.server_idle(self.last_server_id.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
55
src/pool.rs
55
src/pool.rs
@@ -332,7 +332,7 @@ impl ConnectionPool {
|
||||
}
|
||||
|
||||
// Indicate we're waiting on a server connection from a pool.
|
||||
self.stats.client_waiting(process_id, address.id);
|
||||
self.stats.client_waiting(process_id);
|
||||
|
||||
// Check if we can connect
|
||||
let mut conn = match self.databases[address.shard][address.address_index]
|
||||
@@ -343,8 +343,7 @@ impl ConnectionPool {
|
||||
Err(err) => {
|
||||
error!("Banning instance {:?}, error: {:?}", address, err);
|
||||
self.ban(&address, process_id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.client_checkout_error(process_id, address.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -361,14 +360,14 @@ impl ConnectionPool {
|
||||
// Health checks are pretty expensive.
|
||||
if !require_healthcheck {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, server.server_id());
|
||||
self.stats.server_active(process_id, server.server_id());
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
debug!("Running health check on server {:?}", address);
|
||||
|
||||
self.stats.server_tested(server.process_id(), address.id);
|
||||
self.stats.server_tested(server.server_id());
|
||||
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||
@@ -379,9 +378,12 @@ impl ConnectionPool {
|
||||
// Check if health check succeeded.
|
||||
Ok(res) => match res {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.checkout_time(
|
||||
now.elapsed().as_micros(),
|
||||
process_id,
|
||||
conn.server_id(),
|
||||
);
|
||||
self.stats.server_active(process_id, conn.server_id());
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
@@ -421,10 +423,9 @@ impl ConnectionPool {
|
||||
/// Ban an address (i.e. replica). It no longer will serve
|
||||
/// traffic for any new transactions. Existing transactions on that replica
|
||||
/// will finish successfully or error out to the clients.
|
||||
pub fn ban(&self, address: &Address, process_id: i32) {
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
|
||||
pub fn ban(&self, address: &Address, client_id: i32) {
|
||||
error!("Banning {:?}", address);
|
||||
self.stats.client_ban_error(client_id, address.id);
|
||||
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let mut guard = self.banlist.write();
|
||||
@@ -560,14 +561,20 @@ impl ManageConnection for ServerPool {
|
||||
/// Attempts to create a new connection.
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
info!("Creating a new server connection {:?}", self.address);
|
||||
let server_id = rand::random::<i32>();
|
||||
|
||||
// Put a temporary process_id into the stats
|
||||
// for server login.
|
||||
let process_id = rand::random::<i32>();
|
||||
self.stats.server_login(process_id, self.address.id);
|
||||
self.stats.server_register(
|
||||
server_id,
|
||||
self.address.id,
|
||||
self.address.name(),
|
||||
self.address.pool_name.clone(),
|
||||
self.address.username.clone(),
|
||||
);
|
||||
self.stats.server_login(server_id);
|
||||
|
||||
// Connect to the PostgreSQL server.
|
||||
match Server::startup(
|
||||
server_id,
|
||||
&self.address,
|
||||
&self.user,
|
||||
&self.database,
|
||||
@@ -577,13 +584,11 @@ impl ManageConnection for ServerPool {
|
||||
.await
|
||||
{
|
||||
Ok(conn) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
self.stats.server_idle(server_id);
|
||||
Ok(conn)
|
||||
}
|
||||
Err(err) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
self.stats.server_disconnecting(server_id);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
@@ -608,6 +613,11 @@ pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a pointer to all configured pools.
|
||||
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
|
||||
return (*(*POOLS.load())).clone();
|
||||
}
|
||||
|
||||
/// How many total servers we have in the config.
|
||||
pub fn get_number_of_addresses() -> usize {
|
||||
get_all_pools()
|
||||
@@ -615,8 +625,3 @@ pub fn get_number_of_addresses() -> usize {
|
||||
.map(|(_, pool)| pool.databases())
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Get a pointer to all configured pools.
|
||||
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
|
||||
return (*(*POOLS.load())).clone();
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::net::SocketAddr;
|
||||
|
||||
use crate::config::Address;
|
||||
use crate::pool::get_all_pools;
|
||||
use crate::stats::get_stats;
|
||||
use crate::stats::get_address_stats;
|
||||
|
||||
struct MetricHelpType {
|
||||
help: &'static str,
|
||||
@@ -164,7 +164,7 @@ impl PrometheusMetric {
|
||||
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
|
||||
match (request.method(), request.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let stats = get_stats();
|
||||
let stats: HashMap<usize, HashMap<String, i64>> = get_address_stats();
|
||||
|
||||
let mut lines = Vec::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
|
||||
@@ -20,6 +20,8 @@ use crate::ClientServerMap;
|
||||
|
||||
/// Server state.
|
||||
pub struct Server {
|
||||
server_id: i32,
|
||||
|
||||
/// Server host, e.g. localhost,
|
||||
/// port, e.g. 5432, and role, e.g. primary or replica.
|
||||
address: Address,
|
||||
@@ -72,6 +74,7 @@ impl Server {
|
||||
/// Pretend to be the Postgres client and connect to the server given host, port and credentials.
|
||||
/// Perform the authentication and return the server in a ready for query state.
|
||||
pub async fn startup(
|
||||
server_id: i32,
|
||||
address: &Address,
|
||||
user: &User,
|
||||
database: &str,
|
||||
@@ -315,6 +318,7 @@ impl Server {
|
||||
write: write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
server_info: server_info,
|
||||
server_id: server_id,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
in_transaction: false,
|
||||
@@ -372,8 +376,7 @@ impl Server {
|
||||
|
||||
/// Send messages to the server from the client.
|
||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||
self.stats
|
||||
.data_sent(messages.len(), self.process_id, self.address.id);
|
||||
self.stats.data_sent(messages.len(), self.server_id);
|
||||
|
||||
match write_all_half(&mut self.write, messages).await {
|
||||
Ok(_) => {
|
||||
@@ -505,8 +508,7 @@ impl Server {
|
||||
let bytes = self.buffer.clone();
|
||||
|
||||
// Keep track of how much data we got from the server for stats.
|
||||
self.stats
|
||||
.data_received(bytes.len(), self.process_id, self.address.id);
|
||||
self.stats.data_received(bytes.len(), self.server_id);
|
||||
|
||||
// Clear the buffer for next query.
|
||||
self.buffer.clear();
|
||||
@@ -629,9 +631,10 @@ impl Server {
|
||||
self.address.clone()
|
||||
}
|
||||
|
||||
/// Get the server's unique identifier.
|
||||
pub fn process_id(&self) -> i32 {
|
||||
self.process_id
|
||||
/// Get the server connection identifier
|
||||
/// Used to uniquely identify connection in statistics
|
||||
pub fn server_id(&self) -> i32 {
|
||||
self.server_id
|
||||
}
|
||||
|
||||
// Get server's latest response timestamp
|
||||
@@ -650,8 +653,7 @@ impl Drop for Server {
|
||||
/// the socket is in non-blocking mode, so it may not be ready
|
||||
/// for a write.
|
||||
fn drop(&mut self) {
|
||||
self.stats
|
||||
.server_disconnecting(self.process_id(), self.address.id);
|
||||
self.stats.server_disconnecting(self.server_id);
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(4);
|
||||
bytes.put_u8(b'X');
|
||||
|
||||
1111
src/stats.rs
1111
src/stats.rs
File diff suppressed because it is too large
Load Diff
214
tests/ruby/admin_spec.rb
Normal file
214
tests/ruby/admin_spec.rb
Normal file
@@ -0,0 +1,214 @@
|
||||
# frozen_string_literal: true
|
||||
require 'uri'
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Admin" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
|
||||
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
|
||||
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "SHOW POOLS" do
|
||||
context "bad credentials" do
|
||||
it "does not change any stats" do
|
||||
bad_passsword_url = URI(pgcat_conn_str)
|
||||
bad_passsword_url.password = "wrong"
|
||||
expect { PG::connect("#{bad_passsword_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
|
||||
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "bad database name" do
|
||||
it "does not change any stats" do
|
||||
bad_db_url = URI(pgcat_conn_str)
|
||||
bad_db_url.path = "/wrong_db"
|
||||
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
|
||||
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "client connects but issues no queries" do
|
||||
it "only affects cl_idle stats" do
|
||||
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("20")
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
|
||||
connections.map(&:close)
|
||||
sleep(1.1)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "clients connect and make one query" do
|
||||
it "only affects cl_idle, sv_idle stats" do
|
||||
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
|
||||
end
|
||||
|
||||
sleep(1.1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_active"]).to eq("5")
|
||||
expect(results["sv_active"]).to eq("5")
|
||||
|
||||
sleep(3)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("5")
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
|
||||
connections.map(&:close)
|
||||
sleep(1)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
end
|
||||
end
|
||||
|
||||
context "client connects and opens a transaction and closes connection uncleanly" do
|
||||
it "produces correct statistics" do
|
||||
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
Thread.new do
|
||||
c.async_exec("BEGIN")
|
||||
c.async_exec("SELECT pg_sleep(0.01)")
|
||||
c.close
|
||||
end
|
||||
end
|
||||
|
||||
sleep(1.1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
end
|
||||
end
|
||||
|
||||
context "clients overwhelm server pools" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
|
||||
|
||||
it "cl_waiting is updated to show it" do
|
||||
threads = []
|
||||
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
|
||||
end
|
||||
|
||||
sleep(1.1) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["cl_waiting"]).to eq("2")
|
||||
expect(results["cl_active"]).to eq("2")
|
||||
expect(results["sv_active"]).to eq("2")
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("4")
|
||||
expect(results["sv_idle"]).to eq("2")
|
||||
|
||||
threads.map(&:join)
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "SHOW CLIENTS" do
|
||||
it "reports correct number and application names" do
|
||||
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
|
||||
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(21) # count admin clients
|
||||
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
|
||||
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
|
||||
|
||||
connections[0..5].map(&:close)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(15)
|
||||
|
||||
connections[6..].map(&:close)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
|
||||
admin_conn.close
|
||||
end
|
||||
|
||||
it "reports correct number of queries and transactions" do
|
||||
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
|
||||
|
||||
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
|
||||
connections.each do |c|
|
||||
c.async_exec("SELECT 1")
|
||||
c.async_exec("SELECT 2")
|
||||
c.async_exec("SELECT 3")
|
||||
c.async_exec("BEGIN")
|
||||
c.async_exec("SELECT 4")
|
||||
c.async_exec("SELECT 5")
|
||||
c.async_exec("COMMIT")
|
||||
end
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(3)
|
||||
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
|
||||
expect(normal_client_results[0]["transaction_count"]).to eq("4")
|
||||
expect(normal_client_results[1]["transaction_count"]).to eq("4")
|
||||
expect(normal_client_results[0]["query_count"]).to eq("7")
|
||||
expect(normal_client_results[1]["query_count"]).to eq("7")
|
||||
|
||||
admin_conn.close
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -46,6 +46,50 @@ module Helpers
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("trace")
|
||||
pgcat_cfg = pgcat.current_config
|
||||
|
||||
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
|
||||
# Main proxy configs
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "primary",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => {
|
||||
"database" => "shard0",
|
||||
"servers" => [
|
||||
["localhost", primary.port.to_s, "primary"]
|
||||
]
|
||||
},
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat_cfg["general"]["port"] = pgcat.port
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.primary = primary
|
||||
struct.all_databases = [primary]
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
|
||||
@@ -111,6 +111,6 @@ class PgcatProcess
|
||||
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
||||
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}?application_name=example_app"
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user