Refactor stats to use atomics (#375)

* Refactor stats to use atomics

When we are dealing with a high number of connections, generated
stats cannot be consumed fast enough by the stats collector loop.
This makes the stats subsystem inconsistent and a log of
warning messages are thrown due to unregistered server/clients.

This change refactors the stats subsystem so it uses atomics:

- Now counters are handled using U64 atomics
- Event system is dropped and averages are calculated using a loop
  every 15 seconds.
- Now, instead of snapshots being generated ever second we keep track of servers/clients
  that have registered. Each pool/server/client has its own instance of the counter and
  makes changes directly, instead of adding an event that gets processed later.

* Manually mplement Hash/Eq in `config::Address` ignoring stats

* Add tests for client connection counters

* Allow connecting to dockerized dev pgcat from the host

* stats: Decrease cl_idle when idle socket disconnects
This commit is contained in:
Jose Fernández
2023-03-28 17:19:37 +02:00
committed by GitHub
parent 9a2076a9eb
commit 58ce76d9b9
19 changed files with 1303 additions and 1182 deletions

12
Cargo.lock generated
View File

@@ -37,6 +37,17 @@ dependencies = [
"syn 2.0.9", "syn 2.0.9",
] ]
[[package]]
name = "atomic_enum"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6227a8d6fdb862bcb100c4314d0d9579e5cd73fa6df31a2e6f6e1acd3c5f1207"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@@ -720,6 +731,7 @@ version = "1.0.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"atomic_enum",
"base64", "base64",
"bb8", "bb8",
"bytes", "bytes",

View File

@@ -37,6 +37,7 @@ exitcode = "1.1.2"
futures = "0.3" futures = "0.3"
socket2 = { version = "0.4.7", features = ["all"] } socket2 = { version = "0.4.7", features = ["all"] }
nix = "0.26.2" nix = "0.26.2"
atomic_enum = "0.2.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0" jemallocator = "0.5.0"

View File

@@ -26,6 +26,8 @@ x-common-env-pg:
services: services:
main: main:
image: kubernetes/pause image: kubernetes/pause
ports:
- 6432
pg1: pg1:
<<: *common-definition-pg <<: *common-definition-pg

View File

@@ -1,10 +1,11 @@
use crate::pool::BanReason; use crate::pool::BanReason;
/// Admin database.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::{error, info, trace}; use log::{error, info, trace};
use nix::sys::signal::{self, Signal}; use nix::sys::signal::{self, Signal};
use nix::unistd::Pid; use nix::unistd::Pid;
use std::collections::HashMap; use std::collections::HashMap;
/// Admin database.
use std::sync::atomic::Ordering;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::Instant; use tokio::time::Instant;
@@ -12,9 +13,7 @@ use crate::config::{get_config, reload_config, VERSION};
use crate::errors::Error; use crate::errors::Error;
use crate::messages::*; use crate::messages::*;
use crate::pool::{get_all_pools, get_pool}; use crate::pool::{get_all_pools, get_pool};
use crate::stats::{ use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState,
};
use crate::ClientServerMap; use crate::ClientServerMap;
pub fn generate_server_info_for_admin() -> BytesMut { pub fn generate_server_info_for_admin() -> BytesMut {
@@ -158,7 +157,14 @@ where
"free_clients".to_string(), "free_clients".to_string(),
client_stats client_stats
.keys() .keys()
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle) .filter(|client_id| {
client_stats
.get(client_id)
.unwrap()
.state
.load(Ordering::Relaxed)
== ClientState::Idle
})
.count() .count()
.to_string(), .to_string(),
])); ]));
@@ -166,7 +172,14 @@ where
"used_clients".to_string(), "used_clients".to_string(),
client_stats client_stats
.keys() .keys()
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active) .filter(|client_id| {
client_stats
.get(client_id)
.unwrap()
.state
.load(Ordering::Relaxed)
== ClientState::Active
})
.count() .count()
.to_string(), .to_string(),
])); ]));
@@ -178,7 +191,14 @@ where
"free_servers".to_string(), "free_servers".to_string(),
server_stats server_stats
.keys() .keys()
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle) .filter(|server_id| {
server_stats
.get(server_id)
.unwrap()
.state
.load(Ordering::Relaxed)
== ServerState::Idle
})
.count() .count()
.to_string(), .to_string(),
])); ]));
@@ -186,7 +206,14 @@ where
"used_servers".to_string(), "used_servers".to_string(),
server_stats server_stats
.keys() .keys()
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active) .filter(|server_id| {
server_stats
.get(server_id)
.unwrap()
.state
.load(Ordering::Relaxed)
== ServerState::Active
})
.count() .count()
.to_string(), .to_string(),
])); ]));
@@ -248,28 +275,15 @@ where
let mut res = BytesMut::new(); let mut res = BytesMut::new();
res.put(row_description(&columns)); res.put(row_description(&columns));
for (user_pool, pool) in get_all_pools() {
let def = HashMap::default();
let pool_stats = all_pool_stats
.get(&(user_pool.db.clone(), user_pool.user.clone()))
.unwrap_or(&def);
let pool_config = &pool.settings; for ((_user_pool, _pool), pool_stats) in all_pool_stats {
let mut row = vec![ let mut row = vec![
user_pool.db.clone(), pool_stats.database(),
user_pool.user.clone(), pool_stats.user(),
pool_config.pool_mode.to_string(), pool_stats.pool_mode().to_string(),
]; ];
for column in &columns[3..columns.len()] { pool_stats.populate_row(&mut row);
let value = match column.0 { pool_stats.clear_maxwait();
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
"maxwait_us" => {
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
}
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
};
row.push(value);
}
res.put(data_row(&row)); res.put(data_row(&row));
} }
@@ -400,7 +414,7 @@ where
for (id, pool) in get_all_pools().iter() { for (id, pool) in get_all_pools().iter() {
for address in pool.get_addresses_from_host(host) { for address in pool.get_addresses_from_host(host) {
if !pool.is_banned(&address) { if !pool.is_banned(&address) {
pool.ban(&address, BanReason::AdminBan(duration_seconds), -1); pool.ban(&address, BanReason::AdminBan(duration_seconds), None);
res.put(data_row(&vec![ res.put(data_row(&vec![
id.db.clone(), id.db.clone(),
id.user.clone(), id.user.clone(),
@@ -617,7 +631,6 @@ where
("avg_wait_time", DataType::Numeric), ("avg_wait_time", DataType::Numeric),
]; ];
let all_stats = get_address_stats();
let mut res = BytesMut::new(); let mut res = BytesMut::new();
res.put(row_description(&columns)); res.put(row_description(&columns));
@@ -625,15 +638,10 @@ where
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
let stats = match all_stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()]; let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()];
for column in &columns[3..] { let stats = address.stats.clone();
row.push(stats.get(column.0).unwrap_or(&0).to_string()); stats.populate_row(&mut row);
}
res.put(data_row(&row)); res.put(data_row(&row));
} }
@@ -673,16 +681,16 @@ where
for (_, client) in new_map { for (_, client) in new_map {
let row = vec![ let row = vec![
format!("{:#010X}", client.client_id), format!("{:#010X}", client.client_id()),
client.pool_name, client.pool_name(),
client.username, client.username(),
client.application_name.clone(), client.application_name(),
client.state.to_string(), client.state.load(Ordering::Relaxed).to_string(),
client.transaction_count.to_string(), client.transaction_count.load(Ordering::Relaxed).to_string(),
client.query_count.to_string(), client.query_count.load(Ordering::Relaxed).to_string(),
client.error_count.to_string(), client.error_count.load(Ordering::Relaxed).to_string(),
Instant::now() Instant::now()
.duration_since(client.connect_time) .duration_since(client.connect_time())
.as_secs() .as_secs()
.to_string(), .to_string(),
]; ];
@@ -724,19 +732,20 @@ where
res.put(row_description(&columns)); res.put(row_description(&columns));
for (_, server) in new_map { for (_, server) in new_map {
let application_name = server.application_name.read();
let row = vec![ let row = vec![
format!("{:#010X}", server.server_id), format!("{:#010X}", server.server_id()),
server.pool_name, server.pool_name(),
server.username, server.username(),
server.address_name, server.address_name(),
server.application_name, application_name.clone(),
server.state.to_string(), server.state.load(Ordering::Relaxed).to_string(),
server.transaction_count.to_string(), server.transaction_count.load(Ordering::Relaxed).to_string(),
server.query_count.to_string(), server.query_count.load(Ordering::Relaxed).to_string(),
server.bytes_sent.to_string(), server.bytes_sent.load(Ordering::Relaxed).to_string(),
server.bytes_received.to_string(), server.bytes_received.load(Ordering::Relaxed).to_string(),
Instant::now() Instant::now()
.duration_since(server.connect_time) .duration_since(server.connect_time())
.as_secs() .as_secs()
.to_string(), .to_string(),
]; ];

View File

@@ -3,8 +3,8 @@ use crate::pool::BanReason;
/// Handle clients by pretending to be a PostgreSQL server. /// Handle clients by pretending to be a PostgreSQL server.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -19,7 +19,7 @@ use crate::messages::*;
use crate::pool::{get_pool, ClientServerMap, ConnectionPool}; use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter}; use crate::query_router::{Command, QueryRouter};
use crate::server::Server; use crate::server::Server;
use crate::stats::{get_reporter, Reporter}; use crate::stats::{ClientStats, PoolStats, ServerStats};
use crate::tls::Tls; use crate::tls::Tls;
use tokio_rustls::server::TlsStream; use tokio_rustls::server::TlsStream;
@@ -66,8 +66,8 @@ pub struct Client<S, T> {
#[allow(dead_code)] #[allow(dead_code)]
parameters: HashMap<String, String>, parameters: HashMap<String, String>,
/// Statistics /// Statistics related to this client
stats: Reporter, stats: Arc<ClientStats>,
/// Clients want to talk to admin database. /// Clients want to talk to admin database.
admin: bool, admin: bool,
@@ -75,8 +75,8 @@ pub struct Client<S, T> {
/// Last address the client talked to. /// Last address the client talked to.
last_address_id: Option<usize>, last_address_id: Option<usize>,
/// Last server process id we talked to. /// Last server process stats we talked to.
last_server_id: Option<i32>, last_server_stats: Option<Arc<ServerStats>>,
/// Connected to server /// Connected to server
connected_to_server: bool, connected_to_server: bool,
@@ -135,6 +135,10 @@ pub async fn client_entrypoint(
if !client.is_admin() { if !client.is_admin() {
let _ = drain.send(-1).await; let _ = drain.send(-1).await;
if result.is_err() {
client.stats.disconnect();
}
} }
result result
@@ -183,6 +187,10 @@ pub async fn client_entrypoint(
if !client.is_admin() { if !client.is_admin() {
let _ = drain.send(-1).await; let _ = drain.send(-1).await;
if result.is_err() {
client.stats.disconnect();
}
} }
result result
@@ -233,6 +241,10 @@ pub async fn client_entrypoint(
if !client.is_admin() { if !client.is_admin() {
let _ = drain.send(-1).await; let _ = drain.send(-1).await;
if result.is_err() {
client.stats.disconnect();
}
} }
result result
@@ -258,8 +270,11 @@ pub async fn client_entrypoint(
if !client.is_admin() { if !client.is_admin() {
let _ = drain.send(-1).await; let _ = drain.send(-1).await;
}
if result.is_err() {
client.stats.disconnect();
}
}
result result
} }
@@ -382,7 +397,6 @@ where
shutdown: Receiver<()>, shutdown: Receiver<()>,
admin_only: bool, admin_only: bool,
) -> Result<Client<S, T>, Error> { ) -> Result<Client<S, T>, Error> {
let stats = get_reporter();
let parameters = parse_startup(bytes.clone())?; let parameters = parse_startup(bytes.clone())?;
// This parameter is mandatory by the protocol. // This parameter is mandatory by the protocol.
@@ -537,6 +551,25 @@ where
ready_for_query(&mut write).await?; ready_for_query(&mut write).await?;
trace!("Startup OK"); trace!("Startup OK");
let pool_stats = match get_pool(pool_name, username) {
Some(pool) => {
if !admin {
pool.stats
} else {
Arc::new(PoolStats::default())
}
}
None => Arc::new(PoolStats::default()),
};
let stats = Arc::new(ClientStats::new(
process_id,
application_name,
username,
pool_name,
tokio::time::Instant::now(),
pool_stats,
));
Ok(Client { Ok(Client {
read: BufReader::new(read), read: BufReader::new(read),
@@ -552,7 +585,7 @@ where
stats, stats,
admin, admin,
last_address_id: None, last_address_id: None,
last_server_id: None, last_server_stats: None,
pool_name: pool_name.clone(), pool_name: pool_name.clone(),
username: username.clone(), username: username.clone(),
application_name: application_name.to_string(), application_name: application_name.to_string(),
@@ -583,10 +616,10 @@ where
secret_key, secret_key,
client_server_map, client_server_map,
parameters: HashMap::new(), parameters: HashMap::new(),
stats: get_reporter(), stats: Arc::new(ClientStats::default()),
admin: false, admin: false,
last_address_id: None, last_address_id: None,
last_server_id: None, last_server_stats: None,
pool_name: String::from("undefined"), pool_name: String::from("undefined"),
username: String::from("undefined"), username: String::from("undefined"),
application_name: String::from("undefined"), application_name: String::from("undefined"),
@@ -627,12 +660,8 @@ where
// The query router determines where the query is going to go, // The query router determines where the query is going to go,
// e.g. primary, replica, which shard. // e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new(); let mut query_router = QueryRouter::new();
self.stats.client_register(
self.process_id, self.stats.register(self.stats.clone());
self.pool_name.clone(),
self.username.clone(),
self.application_name.clone(),
);
// Our custom protocol loop. // Our custom protocol loop.
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries
@@ -656,6 +685,8 @@ where
&mut self.write, &mut self.write,
"terminating connection due to administrator command" "terminating connection due to administrator command"
).await?; ).await?;
self.stats.disconnect();
return Ok(()) return Ok(())
} }
@@ -708,6 +739,9 @@ where
'X' => { 'X' => {
debug!("Client disconnecting"); debug!("Client disconnecting");
self.stats.disconnect();
return Ok(()); return Ok(());
} }
@@ -757,7 +791,7 @@ where
current_shard, current_shard,
), ),
) )
.await?; .await?;
} else { } else {
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?; custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
} }
@@ -802,10 +836,13 @@ where
}; };
debug!("Waiting for connection from pool"); debug!("Waiting for connection from pool");
if !self.admin {
self.stats.waiting();
}
// Grab a server from the pool. // Grab a server from the pool.
let connection = match pool let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id) .get(query_router.shard(), query_router.role(), &self.stats)
.await .await
{ {
Ok(conn) => { Ok(conn) => {
@@ -817,6 +854,8 @@ where
// but we were unable to grab a connection from the pool // but we were unable to grab a connection from the pool
// We'll send back an error message and clean the extended // We'll send back an error message and clean the extended
// protocol buffer // protocol buffer
self.stats.idle();
if message[0] as char == 'S' { if message[0] as char == 'S' {
error!("Got Sync message but failed to get a connection from the pool"); error!("Got Sync message but failed to get a connection from the pool");
self.buffer.clear(); self.buffer.clear();
@@ -825,7 +864,7 @@ where
.await?; .await?;
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}", error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err); self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
continue; continue;
} }
}; };
@@ -840,11 +879,10 @@ where
self.connected_to_server = true; self.connected_to_server = true;
// Update statistics // Update statistics
self.stats self.stats.active();
.client_active(self.process_id, server.server_id());
self.last_address_id = Some(address.id); self.last_address_id = Some(address.id);
self.last_server_id = Some(server.server_id()); self.last_server_stats = Some(server.stats());
debug!( debug!(
"Client {:?} talking to server {:?}", "Client {:?} talking to server {:?}",
@@ -885,6 +923,7 @@ where
Ok(Err(err)) => { Ok(Err(err)) => {
// Client disconnected inside a transaction. // Client disconnected inside a transaction.
// Clean up the server and re-use it. // Clean up the server and re-use it.
self.stats.disconnect();
server.checkin_cleanup().await?; server.checkin_cleanup().await?;
return Err(err); return Err(err);
@@ -917,16 +956,26 @@ where
'Q' => { 'Q' => {
debug!("Sending query to server"); debug!("Sending query to server");
self.send_and_receive_loop(code, Some(&message), server, &address, &pool) self.send_and_receive_loop(
.await?; code,
Some(&message),
server,
&address,
&pool,
&self.stats.clone(),
)
.await?;
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(self.process_id, server.server_id()); self.stats.transaction();
server.stats().transaction(&self.application_name);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.idle();
break; break;
} }
} }
@@ -935,6 +984,7 @@ where
// Terminate // Terminate
'X' => { 'X' => {
server.checkin_cleanup().await?; server.checkin_cleanup().await?;
self.stats.disconnect();
self.release(); self.release();
return Ok(()); return Ok(());
@@ -987,13 +1037,21 @@ where
} }
} }
self.send_and_receive_loop(code, None, server, &address, &pool) self.send_and_receive_loop(
.await?; code,
None,
server,
&address,
&pool,
&self.stats.clone(),
)
.await?;
self.buffer.clear(); self.buffer.clear();
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id()); self.stats.transaction();
server.stats().transaction(&self.application_name);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
@@ -1028,7 +1086,9 @@ where
// Clear the buffer // Clear the buffer
self.buffer.clear(); self.buffer.clear();
let response = self.receive_server_message(server, &address, &pool).await?; let response = self
.receive_server_message(server, &address, &pool, &self.stats.clone())
.await?;
match write_all_half(&mut self.write, &response).await { match write_all_half(&mut self.write, &response).await {
Ok(_) => (), Ok(_) => (),
@@ -1039,7 +1099,8 @@ where
}; };
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id()); self.stats.transaction();
server.stats().transaction(&self.application_name);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
@@ -1060,11 +1121,11 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore. // The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool"); debug!("Releasing server back into the pool");
server.checkin_cleanup().await?; server.checkin_cleanup().await?;
self.stats.server_idle(server.server_id()); server.stats().idle();
self.connected_to_server = false; self.connected_to_server = false;
self.release(); self.release();
self.stats.client_idle(self.process_id); self.stats.idle();
} }
} }
@@ -1104,6 +1165,7 @@ where
server: &mut Server, server: &mut Server,
address: &Address, address: &Address,
pool: &ConnectionPool, pool: &ConnectionPool,
client_stats: &ClientStats,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Sending {} to server", code); debug!("Sending {} to server", code);
@@ -1119,7 +1181,9 @@ where
// Read all data the server has to offer, which can be multiple messages // Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks. // buffered in 8196 bytes chunks.
loop { loop {
let response = self.receive_server_message(server, address, pool).await?; let response = self
.receive_server_message(server, address, pool, client_stats)
.await?;
match write_all_half(&mut self.write, &response).await { match write_all_half(&mut self.write, &response).await {
Ok(_) => (), Ok(_) => (),
@@ -1135,10 +1199,10 @@ where
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query( client_stats.query();
self.process_id, server.stats().query(
server.server_id(), Instant::now().duration_since(query_start).as_millis() as u64,
Instant::now().duration_since(query_start).as_millis(), &self.application_name,
); );
Ok(()) Ok(())
@@ -1154,7 +1218,7 @@ where
match server.send(message).await { match server.send(message).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
pool.ban(address, BanReason::MessageSendFailed, self.process_id); pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
Err(err) Err(err)
} }
} }
@@ -1165,6 +1229,7 @@ where
server: &mut Server, server: &mut Server,
address: &Address, address: &Address,
pool: &ConnectionPool, pool: &ConnectionPool,
client_stats: &ClientStats,
) -> Result<BytesMut, Error> { ) -> Result<BytesMut, Error> {
if pool.settings.user.statement_timeout > 0 { if pool.settings.user.statement_timeout > 0 {
match tokio::time::timeout( match tokio::time::timeout(
@@ -1176,7 +1241,7 @@ where
Ok(result) => match result { Ok(result) => match result {
Ok(message) => Ok(message), Ok(message) => Ok(message),
Err(err) => { Err(err) => {
pool.ban(address, BanReason::MessageReceiveFailed, self.process_id); pool.ban(address, BanReason::MessageReceiveFailed, Some(client_stats));
error_response_terminal( error_response_terminal(
&mut self.write, &mut self.write,
&format!("error receiving data from server: {:?}", err), &format!("error receiving data from server: {:?}", err),
@@ -1191,7 +1256,7 @@ where
address, pool.settings.user.username address, pool.settings.user.username
); );
server.mark_bad(); server.mark_bad();
pool.ban(address, BanReason::StatementTimeout, self.process_id); pool.ban(address, BanReason::StatementTimeout, Some(client_stats));
error_response_terminal(&mut self.write, "pool statement timeout").await?; error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout) Err(Error::StatementTimeout)
} }
@@ -1200,7 +1265,7 @@ where
match server.recv().await { match server.recv().await {
Ok(message) => Ok(message), Ok(message) => Ok(message),
Err(err) => { Err(err) => {
pool.ban(address, BanReason::MessageReceiveFailed, self.process_id); pool.ban(address, BanReason::MessageReceiveFailed, Some(client_stats));
error_response_terminal( error_response_terminal(
&mut self.write, &mut self.write,
&format!("error receiving data from server: {:?}", err), &format!("error receiving data from server: {:?}", err),
@@ -1220,9 +1285,9 @@ impl<S, T> Drop for Client<S, T> {
// Dirty shutdown // Dirty shutdown
// TODO: refactor, this is not the best way to handle state management. // TODO: refactor, this is not the best way to handle state management.
self.stats.client_disconnecting(self.process_id);
if self.connected_to_server && self.last_server_id.is_some() { if self.connected_to_server && self.last_server_stats.is_some() {
self.stats.server_idle(self.last_server_id.unwrap()); self.last_server_stats.as_ref().unwrap().idle();
} }
} }
} }

View File

@@ -15,6 +15,7 @@ use tokio::io::AsyncReadExt;
use crate::errors::Error; use crate::errors::Error;
use crate::pool::{ClientServerMap, ConnectionPool}; use crate::pool::{ClientServerMap, ConnectionPool};
use crate::sharding::ShardingFunction; use crate::sharding::ShardingFunction;
use crate::stats::AddressStats;
use crate::tls::{load_certs, load_keys}; use crate::tls::{load_certs, load_keys};
pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -62,7 +63,7 @@ impl PartialEq<Role> for Option<Role> {
} }
/// Address identifying a PostgreSQL server uniquely. /// Address identifying a PostgreSQL server uniquely.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] #[derive(Clone, Debug)]
pub struct Address { pub struct Address {
/// Unique ID per addressable Postgres server. /// Unique ID per addressable Postgres server.
pub id: usize, pub id: usize,
@@ -96,6 +97,9 @@ pub struct Address {
/// List of addresses to receive mirrored traffic. /// List of addresses to receive mirrored traffic.
pub mirrors: Vec<Address>, pub mirrors: Vec<Address>,
/// Address stats
pub stats: Arc<AddressStats>,
} }
impl Default for Address { impl Default for Address {
@@ -112,10 +116,46 @@ impl Default for Address {
username: String::from("username"), username: String::from("username"),
pool_name: String::from("pool_name"), pool_name: String::from("pool_name"),
mirrors: Vec::new(), mirrors: Vec::new(),
stats: Arc::new(AddressStats::default()),
} }
} }
} }
// We need to implement PartialEq by ourselves so we skip stats in the comparison
impl PartialEq for Address {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
&& self.host == other.host
&& self.port == other.port
&& self.shard == other.shard
&& self.address_index == other.address_index
&& self.replica_number == other.replica_number
&& self.database == other.database
&& self.role == other.role
&& self.username == other.username
&& self.pool_name == other.pool_name
&& self.mirrors == other.mirrors
}
}
impl Eq for Address {}
// We need to implement Hash by ourselves so we skip stats in the comparison
impl Hash for Address {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.host.hash(state);
self.port.hash(state);
self.shard.hash(state);
self.address_index.hash(state);
self.replica_number.hash(state);
self.database.hash(state);
self.role.hash(state);
self.username.hash(state);
self.pool_name.hash(state);
self.mirrors.hash(state);
}
}
impl Address { impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`. /// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String { pub fn name(&self) -> String {

View File

@@ -162,8 +162,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Statistics reporting. // Statistics reporting.
let (stats_tx, stats_rx) = mpsc::channel(500_000); REPORTER.store(Arc::new(Reporter::default()));
REPORTER.store(Arc::new(Reporter::new(stats_tx.clone())));
// Connection pool that allows to query all shards and replicas. // Connection pool that allows to query all shards and replicas.
match ConnectionPool::from_config(client_server_map.clone()).await { match ConnectionPool::from_config(client_server_map.clone()).await {
@@ -175,7 +174,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}; };
tokio::task::spawn(async move { tokio::task::spawn(async move {
let mut stats_collector = Collector::new(stats_rx, stats_tx.clone()); let mut stats_collector = Collector::default();
stats_collector.collect().await; stats_collector.collect().await;
}); });

View File

@@ -1,11 +1,13 @@
use std::sync::Arc;
/// A mirrored PostgreSQL client. /// A mirrored PostgreSQL client.
/// Packets arrive to us through a channel from the main client and we send them to the server. /// Packets arrive to us through a channel from the main client and we send them to the server.
use bb8::Pool; use bb8::Pool;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use crate::config::{get_config, Address, Role, User}; use crate::config::{get_config, Address, Role, User};
use crate::pool::{ClientServerMap, ServerPool}; use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool};
use crate::stats::get_reporter; use crate::stats::PoolStats;
use log::{error, info, trace, warn}; use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -21,20 +23,24 @@ impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> { async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config(); let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64; let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) { let (connection_timeout, idle_timeout, cfg) =
Some(cfg) => ( match config.pools.get(&self.address.pool_name) {
cfg.connect_timeout.unwrap_or(default), Some(cfg) => (
cfg.idle_timeout.unwrap_or(default), cfg.connect_timeout.unwrap_or(default),
), cfg.idle_timeout.unwrap_or(default),
None => (default, default), cfg.clone(),
}; ),
None => (default, default, crate::config::Pool::default()),
};
let identifier = PoolIdentifier::new(&self.database, &self.user.username);
let manager = ServerPool::new( let manager = ServerPool::new(
self.address.clone(), self.address.clone(),
self.user.clone(), self.user.clone(),
self.database.as_str(), self.database.as_str(),
ClientServerMap::default(), ClientServerMap::default(),
get_reporter(), Arc::new(PoolStats::new(identifier, cfg.clone())),
); );
Pool::builder() Pool::builder()

View File

@@ -22,7 +22,7 @@ use crate::errors::Error;
use crate::server::Server; use crate::server::Server;
use crate::sharding::ShardingFunction; use crate::sharding::ShardingFunction;
use crate::stats::{get_reporter, Reporter}; use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
pub type ProcessId = i32; pub type ProcessId = i32;
pub type SecretKey = i32; pub type SecretKey = i32;
@@ -51,7 +51,7 @@ pub enum BanReason {
/// An identifier for a PgCat pool, /// An identifier for a PgCat pool,
/// a database visible to clients. /// a database visible to clients.
#[derive(Hash, Debug, Clone, PartialEq, Eq)] #[derive(Hash, Debug, Clone, PartialEq, Eq, Default)]
pub struct PoolIdentifier { pub struct PoolIdentifier {
// The name of the database clients want to connect to. // The name of the database clients want to connect to.
pub db: String, pub db: String,
@@ -161,10 +161,6 @@ pub struct ConnectionPool {
/// that should not be queried. /// that should not be queried.
banlist: BanList, banlist: BanList,
/// The statistics aggregator runs in a separate task
/// and receives stats from clients, servers, and the pool.
stats: Reporter,
/// The server information (K messages) have to be passed to the /// The server information (K messages) have to be passed to the
/// clients on startup. We pre-connect to all shards and replicas /// clients on startup. We pre-connect to all shards and replicas
/// on pool creation and save the K messages here. /// on pool creation and save the K messages here.
@@ -185,6 +181,8 @@ pub struct ConnectionPool {
/// If the pool has been paused or not. /// If the pool has been paused or not.
paused: Arc<AtomicBool>, paused: Arc<AtomicBool>,
paused_waiter: Arc<Notify>, paused_waiter: Arc<Notify>,
pub stats: Arc<PoolStats>,
} }
impl ConnectionPool { impl ConnectionPool {
@@ -201,6 +199,7 @@ impl ConnectionPool {
// There is one pool per database/user pair. // There is one pool per database/user pair.
for user in pool_config.users.values() { for user in pool_config.users.values() {
let old_pool_ref = get_pool(pool_name, &user.username); let old_pool_ref = get_pool(pool_name, &user.username);
let identifier = PoolIdentifier::new(pool_name, &user.username);
match old_pool_ref { match old_pool_ref {
Some(pool) => { Some(pool) => {
@@ -211,10 +210,7 @@ impl ConnectionPool {
"[pool: {}][user: {}] has not changed", "[pool: {}][user: {}] has not changed",
pool_name, user.username pool_name, user.username
); );
new_pools.insert( new_pools.insert(identifier.clone(), pool.clone());
PoolIdentifier::new(pool_name, &user.username),
pool.clone(),
);
continue; continue;
} }
} }
@@ -234,6 +230,10 @@ impl ConnectionPool {
.clone() .clone()
.into_keys() .into_keys()
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone()));
// Allow the pool to be seen in statistics
pool_stats.register(pool_stats.clone());
// Sort by shard number to ensure consistency. // Sort by shard number to ensure consistency.
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap()); shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
@@ -266,6 +266,7 @@ impl ConnectionPool {
username: user.username.clone(), username: user.username.clone(),
pool_name: pool_name.clone(), pool_name: pool_name.clone(),
mirrors: vec![], mirrors: vec![],
stats: Arc::new(AddressStats::default()),
}); });
address_id += 1; address_id += 1;
} }
@@ -283,6 +284,7 @@ impl ConnectionPool {
username: user.username.clone(), username: user.username.clone(),
pool_name: pool_name.clone(), pool_name: pool_name.clone(),
mirrors: mirror_addresses, mirrors: mirror_addresses,
stats: Arc::new(AddressStats::default()),
}; };
address_id += 1; address_id += 1;
@@ -296,7 +298,7 @@ impl ConnectionPool {
user.clone(), user.clone(),
&shard.database, &shard.database,
client_server_map.clone(), client_server_map.clone(),
get_reporter(), pool_stats.clone(),
); );
let connect_timeout = match pool_config.connect_timeout { let connect_timeout = match pool_config.connect_timeout {
@@ -331,9 +333,9 @@ impl ConnectionPool {
let pool = ConnectionPool { let pool = ConnectionPool {
databases: shards, databases: shards,
stats: pool_stats,
addresses, addresses,
banlist: Arc::new(RwLock::new(banlist)), banlist: Arc::new(RwLock::new(banlist)),
stats: get_reporter(),
config_hash: new_pool_hash_value, config_hash: new_pool_hash_value,
server_info: Arc::new(RwLock::new(BytesMut::new())), server_info: Arc::new(RwLock::new(BytesMut::new())),
settings: PoolSettings { settings: PoolSettings {
@@ -476,9 +478,9 @@ impl ConnectionPool {
/// Get a connection from the pool. /// Get a connection from the pool.
pub async fn get( pub async fn get(
&self, &self,
shard: usize, // shard number shard: usize, // shard number
role: Option<Role>, // primary or replica role: Option<Role>, // primary or replica
client_process_id: i32, // client id client_stats: &ClientStats, // client id
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let mut candidates: Vec<&Address> = self.addresses[shard] let mut candidates: Vec<&Address> = self.addresses[shard]
.iter() .iter()
@@ -517,7 +519,7 @@ impl ConnectionPool {
// Indicate we're waiting on a server connection from a pool. // Indicate we're waiting on a server connection from a pool.
let now = Instant::now(); let now = Instant::now();
self.stats.client_waiting(client_process_id); client_stats.waiting();
// Check if we can connect // Check if we can connect
let mut conn = match self.databases[address.shard][address.address_index] let mut conn = match self.databases[address.shard][address.address_index]
@@ -527,9 +529,10 @@ impl ConnectionPool {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
error!("Banning instance {:?}, error: {:?}", address, err); error!("Banning instance {:?}, error: {:?}", address, err);
self.ban(address, BanReason::FailedCheckout, client_process_id); self.ban(address, BanReason::FailedCheckout, Some(client_stats));
self.stats address.stats.error();
.client_checkout_error(client_process_id, address.id); client_stats.idle();
client_stats.checkout_error();
continue; continue;
} }
}; };
@@ -546,18 +549,18 @@ impl ConnectionPool {
// since we last checked the server is ok. // since we last checked the server is ok.
// Health checks are pretty expensive. // Health checks are pretty expensive.
if !require_healthcheck { if !require_healthcheck {
self.stats.checkout_time( let checkout_time: u64 = now.elapsed().as_micros() as u64;
now.elapsed().as_micros(), client_stats.checkout_time(checkout_time);
client_process_id, server
server.server_id(), .stats()
); .checkout_time(checkout_time, client_stats.application_name());
self.stats server.stats().active(client_stats.application_name());
.server_active(client_process_id, server.server_id());
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
if self if self
.run_health_check(address, server, now, client_process_id) .run_health_check(address, server, now, client_stats)
.await .await
{ {
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
@@ -565,7 +568,6 @@ impl ConnectionPool {
continue; continue;
} }
} }
Err(Error::AllServersDown) Err(Error::AllServersDown)
} }
@@ -574,11 +576,11 @@ impl ConnectionPool {
address: &Address, address: &Address,
server: &mut Server, server: &mut Server,
start: Instant, start: Instant,
client_process_id: i32, client_info: &ClientStats,
) -> bool { ) -> bool {
debug!("Running health check on server {:?}", address); debug!("Running health check on server {:?}", address);
self.stats.server_tested(server.server_id()); server.stats().tested();
match tokio::time::timeout( match tokio::time::timeout(
tokio::time::Duration::from_millis(self.settings.healthcheck_timeout), tokio::time::Duration::from_millis(self.settings.healthcheck_timeout),
@@ -589,13 +591,13 @@ impl ConnectionPool {
// Check if health check succeeded. // Check if health check succeeded.
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats.checkout_time( let checkout_time: u64 = start.elapsed().as_micros() as u64;
start.elapsed().as_micros(), client_info.checkout_time(checkout_time);
client_process_id, server
server.server_id(), .stats()
); .checkout_time(checkout_time, client_info.application_name());
self.stats server.stats().active(client_info.application_name());
.server_active(client_process_id, server.server_id());
return true; return true;
} }
@@ -620,14 +622,14 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool. // Don't leave a bad connection in the pool.
server.mark_bad(); server.mark_bad();
self.ban(&address, BanReason::FailedHealthCheck, client_process_id); self.ban(&address, BanReason::FailedHealthCheck, Some(client_info));
return false; return false;
} }
/// Ban an address (i.e. replica). It no longer will serve /// Ban an address (i.e. replica). It no longer will serve
/// traffic for any new transactions. Existing transactions on that replica /// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients. /// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address, reason: BanReason, client_id: i32) { pub fn ban(&self, address: &Address, reason: BanReason, client_info: Option<&ClientStats>) {
// Primary can never be banned // Primary can never be banned
if address.role == Role::Primary { if address.role == Role::Primary {
return; return;
@@ -636,7 +638,10 @@ impl ConnectionPool {
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.write(); let mut guard = self.banlist.write();
error!("Banning {:?}", address); error!("Banning {:?}", address);
self.stats.client_ban_error(client_id, address.id); if let Some(client_info) = client_info {
client_info.ban_error();
address.stats.error();
}
guard[address.shard].insert(address.clone(), (reason, now)); guard[address.shard].insert(address.clone(), (reason, now));
} }
@@ -797,7 +802,7 @@ pub struct ServerPool {
user: User, user: User,
database: String, database: String,
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
stats: Reporter, stats: Arc<PoolStats>,
} }
impl ServerPool { impl ServerPool {
@@ -806,11 +811,11 @@ impl ServerPool {
user: User, user: User,
database: &str, database: &str,
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
stats: Reporter, stats: Arc<PoolStats>,
) -> ServerPool { ) -> ServerPool {
ServerPool { ServerPool {
address, address,
user, user: user.clone(),
database: database.to_string(), database: database.to_string(),
client_server_map, client_server_map,
stats, stats,
@@ -826,34 +831,31 @@ impl ManageConnection for ServerPool {
/// Attempts to create a new connection. /// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> { async fn connect(&self) -> Result<Self::Connection, Self::Error> {
info!("Creating a new server connection {:?}", self.address); info!("Creating a new server connection {:?}", self.address);
let server_id = rand::random::<i32>();
self.stats.server_register( let stats = Arc::new(ServerStats::new(
server_id, self.address.clone(),
self.address.id, self.stats.clone(),
self.address.name(), tokio::time::Instant::now(),
self.address.pool_name.clone(), ));
self.address.username.clone(),
); stats.register(stats.clone());
self.stats.server_login(server_id);
// Connect to the PostgreSQL server. // Connect to the PostgreSQL server.
match Server::startup( match Server::startup(
server_id,
&self.address, &self.address,
&self.user, &self.user,
&self.database, &self.database,
self.client_server_map.clone(), self.client_server_map.clone(),
self.stats.clone(), stats.clone(),
) )
.await .await
{ {
Ok(conn) => { Ok(conn) => {
self.stats.server_idle(server_id); stats.idle();
Ok(conn) Ok(conn)
} }
Err(err) => { Err(err) => {
self.stats.server_disconnecting(server_id); stats.disconnect();
Err(err) Err(err)
} }
} }
@@ -881,11 +883,3 @@ pub fn get_pool(db: &str, user: &str) -> Option<ConnectionPool> {
pub fn get_all_pools() -> HashMap<PoolIdentifier, ConnectionPool> { pub fn get_all_pools() -> HashMap<PoolIdentifier, ConnectionPool> {
(*(*POOLS.load())).clone() (*(*POOLS.load())).clone()
} }
/// How many total servers we have in the config.
pub fn get_number_of_addresses() -> usize {
get_all_pools()
.iter()
.map(|(_, pool)| pool.databases())
.sum()
}

View File

@@ -5,10 +5,12 @@ use phf::phf_map;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::config::Address; use crate::config::Address;
use crate::pool::get_all_pools; use crate::pool::get_all_pools;
use crate::stats::{get_address_stats, get_pool_stats, get_server_stats, ServerInformation}; use crate::stats::{get_pool_stats, get_server_stats, ServerStats};
struct MetricHelpType { struct MetricHelpType {
help: &'static str, help: &'static str,
@@ -220,7 +222,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
fn from_address(address: &Address, name: &str, value: i64) -> Option<PrometheusMetric<i64>> { fn from_address(address: &Address, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new(); let mut labels = HashMap::new();
labels.insert("host", address.host.clone()); labels.insert("host", address.host.clone());
labels.insert("shard", address.shard.to_string()); labels.insert("shard", address.shard.to_string());
@@ -231,7 +233,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
fn from_pool(pool: &(String, String), name: &str, value: i64) -> Option<PrometheusMetric<i64>> { fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new(); let mut labels = HashMap::new();
labels.insert("pool", pool.0.clone()); labels.insert("pool", pool.0.clone());
labels.insert("user", pool.1.clone()); labels.insert("user", pool.1.clone());
@@ -261,20 +263,18 @@ async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hype
// Adds metrics shown in a SHOW STATS admin command. // Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) { fn push_address_stats(lines: &mut Vec<String>) {
let address_stats: HashMap<usize, HashMap<String, i64>> = get_address_stats();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
if let Some(address_stats) = address_stats.get(&address.id) { let stats = &*address.stats;
for (key, value) in address_stats.iter() { for (key, value) in stats.clone() {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<i64>::from_address(address, key, *value) PrometheusMetric::<u64>::from_address(address, &key, value)
{ {
lines.push(prometheus_metric.to_string()); lines.push(prometheus_metric.to_string());
} else { } else {
warn!("Metric {} not implemented for {}", key, address.name()); warn!("Metric {} not implemented for {}", key, address.name());
}
} }
} }
} }
@@ -286,8 +286,9 @@ fn push_address_stats(lines: &mut Vec<String>) {
fn push_pool_stats(lines: &mut Vec<String>) { fn push_pool_stats(lines: &mut Vec<String>) {
let pool_stats = get_pool_stats(); let pool_stats = get_pool_stats();
for (pool, stats) in pool_stats.iter() { for (pool, stats) in pool_stats.iter() {
for (name, value) in stats.iter() { let stats = &**stats;
if let Some(prometheus_metric) = PrometheusMetric::<i64>::from_pool(pool, name, *value) for (name, value) in stats.clone() {
if let Some(prometheus_metric) = PrometheusMetric::<u64>::from_pool(pool, &name, value)
{ {
lines.push(prometheus_metric.to_string()); lines.push(prometheus_metric.to_string());
} else { } else {
@@ -330,9 +331,9 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) { fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats(); let server_stats = get_server_stats();
let mut server_stats_by_addresses = HashMap::<String, ServerInformation>::new(); let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
for (_, info) in server_stats { for (_, stats) in server_stats {
server_stats_by_addresses.insert(info.address_name.clone(), info); server_stats_by_addresses.insert(stats.address_name(), stats);
} }
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
@@ -341,11 +342,23 @@ fn push_server_stats(lines: &mut Vec<String>) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) { if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
let metrics = [ let metrics = [
("bytes_received", server_info.bytes_received), (
("bytes_sent", server_info.bytes_sent), "bytes_received",
("transaction_count", server_info.transaction_count), server_info.bytes_received.load(Ordering::Relaxed),
("query_count", server_info.query_count), ),
("error_count", server_info.error_count), ("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
(
"transaction_count",
server_info.transaction_count.load(Ordering::Relaxed),
),
(
"query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
]; ];
for (key, value) in metrics { for (key, value) in metrics {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =

View File

@@ -3,6 +3,7 @@
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use std::io::Read; use std::io::Read;
use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
@@ -17,12 +18,10 @@ use crate::messages::*;
use crate::mirrors::MirroringManager; use crate::mirrors::MirroringManager;
use crate::pool::ClientServerMap; use crate::pool::ClientServerMap;
use crate::scram::ScramSha256; use crate::scram::ScramSha256;
use crate::stats::Reporter; use crate::stats::ServerStats;
/// Server state. /// Server state.
pub struct Server { pub struct Server {
server_id: i32,
/// Server host, e.g. localhost, /// Server host, e.g. localhost,
/// port, e.g. 5432, and role, e.g. primary or replica. /// port, e.g. 5432, and role, e.g. primary or replica.
address: Address, address: Address,
@@ -62,7 +61,7 @@ pub struct Server {
connected_at: chrono::naive::NaiveDateTime, connected_at: chrono::naive::NaiveDateTime,
/// Reports various metrics, e.g. data sent & received. /// Reports various metrics, e.g. data sent & received.
stats: Reporter, stats: Arc<ServerStats>,
/// Application name using the server at the moment. /// Application name using the server at the moment.
application_name: String, application_name: String,
@@ -77,12 +76,11 @@ impl Server {
/// Pretend to be the Postgres client and connect to the server given host, port and credentials. /// Pretend to be the Postgres client and connect to the server given host, port and credentials.
/// Perform the authentication and return the server in a ready for query state. /// Perform the authentication and return the server in a ready for query state.
pub async fn startup( pub async fn startup(
server_id: i32,
address: &Address, address: &Address,
user: &User, user: &User,
database: &str, database: &str,
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
stats: Reporter, stats: Arc<ServerStats>,
) -> Result<Server, Error> { ) -> Result<Server, Error> {
let mut stream = let mut stream =
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await { match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
@@ -325,7 +323,6 @@ impl Server {
write, write,
buffer: BytesMut::with_capacity(8196), buffer: BytesMut::with_capacity(8196),
server_info, server_info,
server_id,
process_id, process_id,
secret_key, secret_key,
in_transaction: false, in_transaction: false,
@@ -396,7 +393,7 @@ impl Server {
/// Send messages to the server from the client. /// Send messages to the server from the client.
pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> { pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> {
self.mirror_send(messages); self.mirror_send(messages);
self.stats.data_sent(messages.len(), self.server_id); self.stats().data_sent(messages.len());
match write_all_half(&mut self.write, messages).await { match write_all_half(&mut self.write, messages).await {
Ok(_) => { Ok(_) => {
@@ -545,7 +542,7 @@ impl Server {
let bytes = self.buffer.clone(); let bytes = self.buffer.clone();
// Keep track of how much data we got from the server for stats. // Keep track of how much data we got from the server for stats.
self.stats.data_received(bytes.len(), self.server_id); self.stats().data_received(bytes.len());
// Clear the buffer for next query. // Clear the buffer for next query.
self.buffer.clear(); self.buffer.clear();
@@ -665,18 +662,17 @@ impl Server {
} }
} }
/// get Server stats
pub fn stats(&self) -> Arc<ServerStats> {
self.stats.clone()
}
/// Get the servers address. /// Get the servers address.
#[allow(dead_code)] #[allow(dead_code)]
pub fn address(&self) -> Address { pub fn address(&self) -> Address {
self.address.clone() self.address.clone()
} }
/// Get the server connection identifier
/// Used to uniquely identify connection in statistics
pub fn server_id(&self) -> i32 {
self.server_id
}
// Get server's latest response timestamp // Get server's latest response timestamp
pub fn last_activity(&self) -> SystemTime { pub fn last_activity(&self) -> SystemTime {
self.last_activity self.last_activity
@@ -708,7 +704,9 @@ impl Drop for Server {
/// for a write. /// for a write.
fn drop(&mut self) { fn drop(&mut self) {
self.mirror_disconnect(); self.mirror_disconnect();
self.stats.server_disconnecting(self.server_id);
// Update statistics
self.stats.disconnect();
let mut bytes = BytesMut::with_capacity(4); let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X'); bytes.put_u8(b'X');

File diff suppressed because it is too large Load Diff

149
src/stats/address.rs Normal file
View File

@@ -0,0 +1,149 @@
use log::warn;
use std::sync::atomic::*;
use std::sync::Arc;
/// Internal address stats
#[derive(Debug, Clone, Default)]
pub struct AddressStats {
pub total_xact_count: Arc<AtomicU64>,
pub total_query_count: Arc<AtomicU64>,
pub total_received: Arc<AtomicU64>,
pub total_sent: Arc<AtomicU64>,
pub total_xact_time: Arc<AtomicU64>,
pub total_query_time: Arc<AtomicU64>,
pub total_wait_time: Arc<AtomicU64>,
pub total_errors: Arc<AtomicU64>,
pub avg_query_count: Arc<AtomicU64>,
pub avg_query_time: Arc<AtomicU64>,
pub avg_recv: Arc<AtomicU64>,
pub avg_sent: Arc<AtomicU64>,
pub avg_errors: Arc<AtomicU64>,
pub avg_xact_time: Arc<AtomicU64>,
pub avg_xact_count: Arc<AtomicU64>,
pub avg_wait_time: Arc<AtomicU64>,
}
impl IntoIterator for AddressStats {
type Item = (String, u64);
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
vec![
(
"total_xact_count".to_string(),
self.total_xact_count.load(Ordering::Relaxed),
),
(
"total_query_count".to_string(),
self.total_query_count.load(Ordering::Relaxed),
),
(
"total_received".to_string(),
self.total_received.load(Ordering::Relaxed),
),
(
"total_sent".to_string(),
self.total_sent.load(Ordering::Relaxed),
),
(
"total_xact_time".to_string(),
self.total_xact_time.load(Ordering::Relaxed),
),
(
"total_query_time".to_string(),
self.total_query_time.load(Ordering::Relaxed),
),
(
"total_wait_time".to_string(),
self.total_wait_time.load(Ordering::Relaxed),
),
(
"total_errors".to_string(),
self.total_errors.load(Ordering::Relaxed),
),
(
"avg_xact_count".to_string(),
self.avg_xact_count.load(Ordering::Relaxed),
),
(
"avg_query_count".to_string(),
self.avg_query_count.load(Ordering::Relaxed),
),
(
"avg_recv".to_string(),
self.avg_recv.load(Ordering::Relaxed),
),
(
"avg_sent".to_string(),
self.avg_sent.load(Ordering::Relaxed),
),
(
"avg_errors".to_string(),
self.avg_errors.load(Ordering::Relaxed),
),
(
"avg_xact_time".to_string(),
self.avg_xact_time.load(Ordering::Relaxed),
),
(
"avg_query_time".to_string(),
self.avg_query_time.load(Ordering::Relaxed),
),
(
"avg_wait_time".to_string(),
self.avg_wait_time.load(Ordering::Relaxed),
),
]
.into_iter()
}
}
impl AddressStats {
pub fn error(&self) {
self.total_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn update_averages(&self) {
let (totals, averages) = self.fields_iterators();
for data in totals.iter().zip(averages.iter()) {
let (total, average) = data;
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
let total = total.load(Ordering::Relaxed);
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
Some(avg)
}) {
warn!("Could not update averages for addresses stats, {:?}", err);
}
}
}
pub fn populate_row(&self, row: &mut Vec<String>) {
for (_key, value) in self.clone() {
row.push(value.to_string());
}
}
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
totals.push(self.total_xact_count.clone());
averages.push(self.avg_xact_count.clone());
totals.push(self.total_query_count.clone());
averages.push(self.avg_query_count.clone());
totals.push(self.total_received.clone());
averages.push(self.avg_recv.clone());
totals.push(self.total_sent.clone());
averages.push(self.avg_sent.clone());
totals.push(self.total_xact_time.clone());
averages.push(self.avg_xact_time.clone());
totals.push(self.total_query_time.clone());
averages.push(self.avg_query_time.clone());
totals.push(self.total_wait_time.clone());
averages.push(self.avg_wait_time.clone());
totals.push(self.total_errors.clone());
averages.push(self.avg_errors.clone());
(totals, averages)
}
}

182
src/stats/client.rs Normal file
View File

@@ -0,0 +1,182 @@
use super::PoolStats;
use super::{get_reporter, Reporter};
use atomic_enum::atomic_enum;
use std::sync::atomic::*;
use std::sync::Arc;
use tokio::time::Instant;
/// The various states that a client can be in
#[atomic_enum]
#[derive(PartialEq)]
pub enum ClientState {
Idle = 0,
Waiting,
Active,
}
impl std::fmt::Display for ClientState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
ClientState::Idle => write!(f, "idle"),
ClientState::Waiting => write!(f, "waiting"),
ClientState::Active => write!(f, "active"),
}
}
}
#[derive(Debug, Clone)]
/// Information we keep track of which can be queried by SHOW CLIENTS
pub struct ClientStats {
/// A random integer assigned to the client and used by stats to track the client
client_id: i32,
/// Data associated with the client, not writable, only set when we construct the ClientStat
application_name: String,
username: String,
pool_name: String,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
/// Number of transactions executed by this client
pub transaction_count: Arc<AtomicU64>,
/// Number of queries executed by this client
pub query_count: Arc<AtomicU64>,
/// Number of errors made by this client
pub error_count: Arc<AtomicU64>,
}
impl Default for ClientStats {
fn default() -> Self {
ClientStats {
client_id: 0,
connect_time: Instant::now(),
application_name: String::new(),
username: String::new(),
pool_name: String::new(),
pool_stats: Arc::new(PoolStats::default()),
total_wait_time: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
reporter: get_reporter(),
}
}
}
impl ClientStats {
pub fn new(
client_id: i32,
application_name: &str,
username: &str,
pool_name: &str,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
) -> Self {
Self {
client_id,
pool_stats,
connect_time,
application_name: application_name.to_string(),
username: username.to_string(),
pool_name: pool_name.to_string(),
..Default::default()
}
}
/// Reports a client is disconecting from the pooler and
/// update metrics on the corresponding pool.
pub fn disconnect(&self) {
self.reporter.client_disconnecting(self.client_id);
self.pool_stats
.client_disconnect(self.state.load(Ordering::Relaxed))
}
/// Register a client with the stats system. The stats system uses client_id
/// to track and aggregate statistics from all source that relate to that client
pub fn register(&self, stats: Arc<ClientStats>) {
self.reporter.client_register(self.client_id, stats);
self.state.store(ClientState::Idle, Ordering::Relaxed);
self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed);
}
/// Reports a client is done querying the server and is no longer assigned a server connection
pub fn idle(&self) {
self.pool_stats
.client_idle(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Idle, Ordering::Relaxed);
}
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
self.pool_stats
.client_waiting(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
/// Reports a client is done waiting for a connection and is about to query the server.
pub fn active(&self) {
self.pool_stats
.client_active(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Active, Ordering::Relaxed);
}
/// Reports a client has failed to obtain a connection from a connection pool
pub fn checkout_error(&self) {
self.state.store(ClientState::Idle, Ordering::Relaxed);
}
/// Reports a client has had the server assigned to it be banned
pub fn ban_error(&self) {
self.state.store(ClientState::Idle, Ordering::Relaxed);
self.error_count.fetch_add(1, Ordering::Relaxed);
}
/// Reportes the time spent by a client waiting to get a healthy connection from the pool
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server
pub fn query(&self) {
self.query_count.fetch_add(1, Ordering::Relaxed);
}
/// Report a transaction executed by a client a server
/// we report each individual queries outside a transaction as a transaction
/// We only count the initial BEGIN as a transaction, all queries within do not
/// count as transactions
pub fn transaction(&self) {
self.transaction_count.fetch_add(1, Ordering::Relaxed);
}
// Helper methods for show clients
pub fn connect_time(&self) -> Instant {
self.connect_time
}
pub fn client_id(&self) -> i32 {
self.client_id
}
pub fn application_name(&self) -> String {
self.application_name.clone()
}
pub fn username(&self) -> String {
self.username.clone()
}
pub fn pool_name(&self) -> String {
self.pool_name.clone()
}
}

274
src/stats/pool.rs Normal file
View File

@@ -0,0 +1,274 @@
use crate::config::Pool;
use crate::config::PoolMode;
use crate::pool::PoolIdentifier;
use std::sync::atomic::*;
use std::sync::Arc;
use super::get_reporter;
use super::Reporter;
use super::{ClientState, ServerState};
#[derive(Debug, Clone, Default)]
/// A struct that holds information about a Pool .
pub struct PoolStats {
// Pool identifier, cannot be changed after creating the instance
identifier: PoolIdentifier,
// Pool Config, cannot be changed after creating the instance
config: Pool,
// A reference to the global reporter.
reporter: Reporter,
/// Counters (atomics)
pub cl_idle: Arc<AtomicU64>,
pub cl_active: Arc<AtomicU64>,
pub cl_waiting: Arc<AtomicU64>,
pub cl_cancel_req: Arc<AtomicU64>,
pub sv_active: Arc<AtomicU64>,
pub sv_idle: Arc<AtomicU64>,
pub sv_used: Arc<AtomicU64>,
pub sv_tested: Arc<AtomicU64>,
pub sv_login: Arc<AtomicU64>,
pub maxwait: Arc<AtomicU64>,
}
impl IntoIterator for PoolStats {
type Item = (String, u64);
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
vec![
("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)),
(
"cl_active".to_string(),
self.cl_active.load(Ordering::Relaxed),
),
(
"cl_waiting".to_string(),
self.cl_waiting.load(Ordering::Relaxed),
),
(
"cl_cancel_req".to_string(),
self.cl_cancel_req.load(Ordering::Relaxed),
),
(
"sv_active".to_string(),
self.sv_active.load(Ordering::Relaxed),
),
("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)),
("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)),
(
"sv_tested".to_string(),
self.sv_tested.load(Ordering::Relaxed),
),
(
"sv_login".to_string(),
self.sv_login.load(Ordering::Relaxed),
),
(
"maxwait".to_string(),
self.maxwait.load(Ordering::Relaxed) / 1_000_000,
),
(
"maxwait_us".to_string(),
self.maxwait.load(Ordering::Relaxed) % 1_000_000,
),
]
.into_iter()
}
}
impl PoolStats {
pub fn new(identifier: PoolIdentifier, config: Pool) -> Self {
Self {
identifier,
config,
reporter: get_reporter(),
..Default::default()
}
}
// Getters
pub fn register(&self, stats: Arc<PoolStats>) {
self.reporter.pool_register(self.identifier.clone(), stats);
}
pub fn database(&self) -> String {
self.identifier.db.clone()
}
pub fn user(&self) -> String {
self.identifier.user.clone()
}
pub fn pool_mode(&self) -> PoolMode {
self.config.pool_mode
}
/// Populates an array of strings with counters (used by admin in show pools)
pub fn populate_row(&self, row: &mut Vec<String>) {
for (_key, value) in self.clone() {
row.push(value.to_string());
}
}
/// Deletes the maxwait counter, this is done everytime we obtain metrics
pub fn clear_maxwait(&self) {
self.maxwait.store(0, Ordering::Relaxed);
}
/// Notified when a server of the pool enters login state.
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_login(&self, from: ServerState) {
self.sv_login.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Login {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_active(&self, from: ServerState) {
self.sv_active.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Active {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'tested'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_tested(&self, from: ServerState) {
self.sv_tested.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Tested {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_idle(&self, from: ServerState) {
self.sv_idle.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Idle {
self.decrease_from_server_state(from);
}
}
/// Notified when a client of the pool become 'waiting'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_waiting(&self, from: ClientState) {
if from != ClientState::Waiting {
self.cl_waiting.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_active(&self, from: ClientState) {
if from != ClientState::Active {
self.cl_active.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_idle(&self, from: ClientState) {
if from != ClientState::Idle {
self.cl_idle.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_disconnect(&self, from: ClientState) {
let counter = match from {
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
ClientState::Active => &self.cl_active,
};
Self::decrease_counter(counter.clone());
}
/// Notified when a server disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn server_disconnect(&self, from: ServerState) {
let counter = match from {
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
ServerState::Tested => &self.sv_tested,
};
Self::decrease_counter(counter.clone());
}
// helpers for counter decrease
fn decrease_from_server_state(&self, from: ServerState) {
let counter = match from {
ServerState::Tested => &self.sv_tested,
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
};
Self::decrease_counter(counter.clone());
}
fn decrease_from_client_state(&self, from: ClientState) {
let counter = match from {
ClientState::Active => &self.cl_active,
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
};
Self::decrease_counter(counter.clone());
}
fn decrease_counter(value: Arc<AtomicU64>) {
if value.load(Ordering::Relaxed) > 0 {
value.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_decrease() {
let stat: PoolStats = PoolStats::default();
stat.server_login(ServerState::Login);
stat.server_idle(ServerState::Login);
assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0);
assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1);
}
}

226
src/stats/server.rs Normal file
View File

@@ -0,0 +1,226 @@
use super::AddressStats;
use super::PoolStats;
use super::{get_reporter, Reporter};
use crate::config::Address;
use atomic_enum::atomic_enum;
use parking_lot::RwLock;
use std::sync::atomic::*;
use std::sync::Arc;
use tokio::time::Instant;
/// The various states that a server can be in
#[atomic_enum]
#[derive(PartialEq)]
pub enum ServerState {
Login = 0,
Active,
Tested,
Idle,
}
impl std::fmt::Display for ServerState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
ServerState::Login => write!(f, "login"),
ServerState::Active => write!(f, "active"),
ServerState::Tested => write!(f, "tested"),
ServerState::Idle => write!(f, "idle"),
}
}
}
/// Information we keep track of which can be queried by SHOW SERVERS
#[derive(Debug, Clone)]
pub struct ServerStats {
/// A random integer assigned to the server and used by stats to track the server
server_id: i32,
/// Context information, only to be read
address: Address,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Data
pub application_name: Arc<RwLock<String>>,
pub state: Arc<AtomicServerState>,
pub bytes_sent: Arc<AtomicU64>,
pub bytes_received: Arc<AtomicU64>,
pub transaction_count: Arc<AtomicU64>,
pub query_count: Arc<AtomicU64>,
pub error_count: Arc<AtomicU64>,
}
impl Default for ServerStats {
fn default() -> Self {
ServerStats {
server_id: 0,
application_name: Arc::new(RwLock::new(String::new())),
address: Address::default(),
pool_stats: Arc::new(PoolStats::default()),
connect_time: Instant::now(),
state: Arc::new(AtomicServerState::new(ServerState::Login)),
bytes_sent: Arc::new(AtomicU64::new(0)),
bytes_received: Arc::new(AtomicU64::new(0)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
reporter: get_reporter(),
}
}
}
impl ServerStats {
pub fn new(address: Address, pool_stats: Arc<PoolStats>, connect_time: Instant) -> Self {
Self {
address,
pool_stats,
connect_time,
server_id: rand::random::<i32>(),
..Default::default()
}
}
pub fn server_id(&self) -> i32 {
self.server_id
}
/// Register a server connection with the stats system. The stats system uses server_id
/// to track and aggregate statistics from all source that relate to that server
// Delegates to reporter
pub fn register(&self, stats: Arc<ServerStats>) {
self.reporter.server_register(self.server_id, stats);
self.login();
}
/// Reports a server connection is no longer assigned to a client
/// and is available for the next client to pick it up
pub fn idle(&self) {
self.pool_stats
.server_idle(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Idle, Ordering::Relaxed);
self.set_undefined_application();
}
/// Reports a server connection is disconecting from the pooler.
/// Also updates metrics on the pool regarding server usage.
pub fn disconnect(&self) {
self.reporter.server_disconnecting(self.server_id);
self.pool_stats
.server_disconnect(self.state.load(Ordering::Relaxed))
}
/// Reports a server connection is being tested before being given to a client.
pub fn tested(&self) {
self.set_undefined_application();
self.pool_stats
.server_tested(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Tested, Ordering::Relaxed);
}
/// Reports a server connection is attempting to login.
pub fn login(&self) {
self.pool_stats
.server_login(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Login, Ordering::Relaxed);
self.set_undefined_application();
}
/// Reports a server connection has been assigned to a client that
/// is about to query the server
pub fn active(&self, application_name: String) {
self.pool_stats
.server_active(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Active, Ordering::Relaxed);
self.set_application(application_name);
}
pub fn address_stats(&self) -> Arc<AddressStats> {
self.address.stats.clone()
}
// Helper methods for show_servers
pub fn pool_name(&self) -> String {
self.pool_stats.database()
}
pub fn username(&self) -> String {
self.pool_stats.user()
}
pub fn address_name(&self) -> String {
self.address.name()
}
pub fn connect_time(&self) -> Instant {
self.connect_time
}
fn set_application(&self, name: String) {
let mut application_name = self.application_name.write();
*application_name = name;
}
fn set_undefined_application(&self) {
self.set_application(String::from("Undefined"))
}
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
// Update server stats and address aggergation stats
self.set_application(application_name);
self.address
.stats
.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.pool_stats
.maxwait
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server
pub fn query(&self, milliseconds: u64, application_name: &str) {
self.set_application(application_name.to_string());
let address_stats = self.address_stats();
address_stats
.total_query_count
.fetch_add(1, Ordering::Relaxed);
address_stats
.total_query_time
.fetch_add(milliseconds, Ordering::Relaxed);
}
/// Report a transaction executed by a client a server
/// we report each individual queries outside a transaction as a transaction
/// We only count the initial BEGIN as a transaction, all queries within do not
/// count as transactions
pub fn transaction(&self, application_name: &str) {
self.set_application(application_name.to_string());
self.transaction_count.fetch_add(1, Ordering::Relaxed);
self.address
.stats
.total_xact_count
.fetch_add(1, Ordering::Relaxed);
}
/// Report data sent to a server
pub fn data_sent(&self, amount_bytes: usize) {
self.bytes_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address
.stats
.total_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
}
/// Report data received from a server
pub fn data_received(&self, amount_bytes: usize) {
self.bytes_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address
.stats
.total_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
}
}

View File

@@ -176,6 +176,47 @@ describe "Admin" do
end end
end end
context "clients connects and disconnect normally" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it 'shows the same number of clients before and after' do
clients_before = clients_connected_to_pool(processes: processes)
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_between = clients_connected_to_pool(processes: processes)
expect(clients_before).not_to eq(clients_between)
connections.each(&:close)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients connects and disconnect abruptly" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
it 'shows the same number of clients before and after' do
threads = []
connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_before = clients_connected_to_pool(processes: processes)
random_string = (0...8).map { (65 + rand(26)).chr }.join
connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
sleep(1)
# psql starts two processes, we only know the pid of the parent, this
# ensure both are killed
`pkill -9 -f '#{random_string}'`
Process.wait(faulty_client)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients overwhelm server pools" do context "clients overwhelm server pools" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
@@ -199,7 +240,7 @@ describe "Admin" do
sleep(2.5) # Allow time for stats to update sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0] results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end end
expect(results["cl_idle"]).to eq("4") expect(results["cl_idle"]).to eq("4")

BIN
tests/ruby/capture Normal file

Binary file not shown.

View File

@@ -19,3 +19,10 @@ ensure
STDOUT.reopen(sout) STDOUT.reopen(sout)
STDERR.reopen(serr) STDERR.reopen(serr)
end end
def clients_connected_to_pool(pool_index: 0, processes:)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[pool_index]
admin_conn.close
results['cl_idle'].to_i + results['cl_active'].to_i + results['cl_waiting'].to_i
end