Compare commits

...

2 Commits

Author SHA1 Message Date
Lev Kokotov
341ebf4123 docs and remove Option (#58)
* docs and remove Option

* lint
2022-03-07 23:05:40 -08:00
Lev Kokotov
35828a0a8c Per-shard statistics (#57)
* per shard stats

* aight

* cleaner

* fix show lists

* comments

* more friendly

* case-insensitive

* test all shards

* ok

* HUH?
2022-03-04 17:04:27 -08:00
12 changed files with 371 additions and 162 deletions

View File

@@ -13,6 +13,9 @@ function start_pgcat() {
# Setup the database with shards and user # Setup the database with shards and user
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Install Toxiproxy to simulate a downed/slow database # Install Toxiproxy to simulate a downed/slow database
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
@@ -28,9 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
start_pgcat "info" start_pgcat "info"
# pgbench test # pgbench test
pgbench -i -h 127.0.0.1 -p 6432 && \ pgbench -i -h 127.0.0.1 -p 6432
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \ pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
# COPY TO STDOUT test # COPY TO STDOUT test
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null

10
Cargo.lock generated
View File

@@ -371,7 +371,6 @@ dependencies = [
"serde_derive", "serde_derive",
"sha-1", "sha-1",
"sqlparser", "sqlparser",
"statsd",
"tokio", "tokio",
"toml", "toml",
] ]
@@ -542,15 +541,6 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "statsd"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330"
dependencies = [
"rand",
]
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.86"

View File

@@ -20,7 +20,6 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
statsd = "0.15"
sqlparser = "0.14" sqlparser = "0.14"
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"

View File

@@ -31,7 +31,7 @@ pub async fn handle_admin(
if query.starts_with("SHOW STATS") { if query.starts_with("SHOW STATS") {
trace!("SHOW STATS"); trace!("SHOW STATS");
show_stats(stream).await show_stats(stream, &pool).await
} else if query.starts_with("RELOAD") { } else if query.starts_with("RELOAD") {
trace!("RELOAD"); trace!("RELOAD");
reload(stream).await reload(stream).await
@@ -77,11 +77,19 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
])); // but admin tools that work with pgbouncer want this ])); // but admin tools that work with pgbouncer want this
res.put(data_row(&vec![ res.put(data_row(&vec![
"free_clients".to_string(), "free_clients".to_string(),
stats["cl_idle"].to_string(), stats
.keys()
.map(|address_id| stats[&address_id]["cl_idle"])
.sum::<i64>()
.to_string(),
])); ]));
res.put(data_row(&vec![ res.put(data_row(&vec![
"used_clients".to_string(), "used_clients".to_string(),
stats["cl_active"].to_string(), stats
.keys()
.map(|address_id| stats[&address_id]["cl_active"])
.sum::<i64>()
.to_string(),
])); ]));
res.put(data_row(&vec![ res.put(data_row(&vec![
"login_clients".to_string(), "login_clients".to_string(),
@@ -89,11 +97,19 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
])); ]));
res.put(data_row(&vec![ res.put(data_row(&vec![
"free_servers".to_string(), "free_servers".to_string(),
stats["sv_idle"].to_string(), stats
.keys()
.map(|address_id| stats[&address_id]["sv_idle"])
.sum::<i64>()
.to_string(),
])); ]));
res.put(data_row(&vec![ res.put(data_row(&vec![
"used_servers".to_string(), "used_servers".to_string(),
stats["sv_active"].to_string(), stats
.keys()
.map(|address_id| stats[&address_id]["sv_active"])
.sum::<i64>()
.to_string(),
])); ]));
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()])); res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()])); res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
@@ -125,7 +141,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
} }
/// SHOW POOLS /// SHOW POOLS
async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> { async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats(); let stats = get_stats();
let config = { let config = {
let guard = get_config(); let guard = get_config();
@@ -151,16 +167,26 @@ async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Resu
let mut res = BytesMut::new(); let mut res = BytesMut::new();
res.put(row_description(&columns)); res.put(row_description(&columns));
let mut row = vec![String::from("all"), config.user.name.clone()]; for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
for column in &columns[2..columns.len() - 1] { let mut row = vec![address.name(), config.user.name.clone()];
let value = stats.get(column.0).unwrap_or(&0).to_string();
row.push(value); for column in &columns[2..columns.len() - 1] {
let value = stats.get(column.0).unwrap_or(&0).to_string();
row.push(value);
}
row.push(config.general.pool_mode.to_string());
res.put(data_row(&row));
}
} }
row.push(config.general.pool_mode.to_string());
res.put(data_row(&row));
res.put(command_complete("SHOW")); res.put(command_complete("SHOW"));
res.put_u8(b'Z'); res.put_u8(b'Z');
@@ -309,7 +335,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
} }
/// SHOW STATS /// SHOW STATS
async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let columns = vec![ let columns = vec![
("database", DataType::Text), ("database", DataType::Text),
("total_xact_count", DataType::Numeric), ("total_xact_count", DataType::Numeric),
@@ -332,15 +358,24 @@ async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new(); let mut res = BytesMut::new();
res.put(row_description(&columns)); res.put(row_description(&columns));
let mut row = vec![ for shard in 0..pool.shards() {
String::from("all"), // TODO: per-database stats, for server in 0..pool.servers(shard) {
]; let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
for column in &columns[1..] { let mut row = vec![address.name()];
row.push(stats.get(column.0).unwrap_or(&0).to_string());
for column in &columns[1..] {
row.push(stats.get(column.0).unwrap_or(&0).to_string());
}
res.put(data_row(&row));
}
} }
res.put(data_row(&row));
res.put(command_complete("SHOW")); res.put(command_complete("SHOW"));
res.put_u8(b'Z'); res.put_u8(b'Z');

View File

@@ -58,6 +58,12 @@ pub struct Client {
// Clients want to talk to admin // Clients want to talk to admin
admin: bool, admin: bool,
// Last address the client talked to
last_address_id: Option<usize>,
// Last server process id we talked to
last_server_id: Option<i32>,
} }
impl Client { impl Client {
@@ -147,6 +153,8 @@ impl Client {
parameters: parameters, parameters: parameters,
stats: stats, stats: stats,
admin: admin, admin: admin,
last_address_id: None,
last_server_id: None,
}); });
} }
@@ -169,6 +177,8 @@ impl Client {
parameters: HashMap::new(), parameters: HashMap::new(),
stats: stats, stats: stats,
admin: false, admin: false,
last_address_id: None,
last_server_id: None,
}); });
} }
@@ -219,9 +229,6 @@ impl Client {
loop { loop {
trace!("Client idle, waiting for message"); trace!("Client idle, waiting for message");
// Client idle, waiting for messages.
self.stats.client_idle(self.process_id);
// Read a complete message from the client, which normally would be // Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol). // either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool, // We can parse it here before grabbing a server from the pool,
@@ -292,13 +299,13 @@ impl Client {
continue; continue;
} }
// Waiting for server connection.
self.stats.client_waiting(self.process_id);
debug!("Waiting for connection from pool"); debug!("Waiting for connection from pool");
// Grab a server from the pool: the client issued a regular query. // Grab a server from the pool: the client issued a regular query.
let connection = match pool.get(query_router.shard(), query_router.role()).await { let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id)
.await
{
Ok(conn) => { Ok(conn) => {
debug!("Got connection from pool"); debug!("Got connection from pool");
conn conn
@@ -312,15 +319,23 @@ impl Client {
}; };
let mut reference = connection.0; let mut reference = connection.0;
let _address = connection.1; let address = connection.1;
let server = &mut *reference; let server = &mut *reference;
// Claim this server as mine for query cancellation. // Claim this server as mine for query cancellation.
server.claim(self.process_id, self.secret_key); server.claim(self.process_id, self.secret_key);
// "disconnect" from the previous server stats-wise
if let Some(last_address_id) = self.last_address_id {
self.stats
.client_disconnecting(self.process_id, last_address_id);
}
// Client active & server active // Client active & server active
self.stats.client_active(self.process_id); self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id()); self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id());
debug!( debug!(
"Client {:?} talking to server {:?}", "Client {:?} talking to server {:?}",
@@ -392,17 +407,17 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(); self.stats.query(self.process_id, address.id);
// The transaction is over, we can release the connection back to the pool. // The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -478,15 +493,15 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(); self.stats.query(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -517,10 +532,10 @@ impl Client {
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -537,6 +552,7 @@ impl Client {
// The server is no longer bound to us, we can't cancel it's queries anymore. // The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool"); debug!("Releasing server back into the pool");
self.release(); self.release();
self.stats.client_idle(self.process_id, address.id);
} }
} }
@@ -549,6 +565,14 @@ impl Client {
impl Drop for Client { impl Drop for Client {
fn drop(&mut self) { fn drop(&mut self) {
self.stats.client_disconnecting(self.process_id); // Disconnect the client
if let Some(address_id) = self.last_address_id {
self.stats.client_disconnecting(self.process_id, address_id);
// The server is now idle
if let Some(process_id) = self.last_server_id {
self.stats.server_idle(process_id, address_id);
}
}
} }
} }

View File

@@ -48,6 +48,7 @@ impl PartialEq<Role> for Option<Role> {
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
pub struct Address { pub struct Address {
pub id: usize,
pub host: String, pub host: String,
pub port: String, pub port: String,
pub shard: usize, pub shard: usize,
@@ -58,6 +59,7 @@ pub struct Address {
impl Default for Address { impl Default for Address {
fn default() -> Address { fn default() -> Address {
Address { Address {
id: 0,
host: String::from("127.0.0.1"), host: String::from("127.0.0.1"),
port: String::from("5432"), port: String::from("5432"),
shard: 0, shard: 0,

View File

@@ -31,7 +31,6 @@ extern crate once_cell;
extern crate serde; extern crate serde;
extern crate serde_derive; extern crate serde_derive;
extern crate sqlparser; extern crate sqlparser;
extern crate statsd;
extern crate tokio; extern crate tokio;
extern crate toml; extern crate toml;
@@ -113,15 +112,19 @@ async fn main() {
// Collect statistics and send them to StatsD // Collect statistics and send them to StatsD
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
let collector_tx = tx.clone();
tokio::task::spawn(async move {
let mut stats_collector = Collector::new(rx, collector_tx);
stats_collector.collect().await;
});
// Connection pool for all shards and replicas
let mut pool = let mut pool =
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await; ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
let collector_tx = tx.clone();
let addresses = pool.databases();
tokio::task::spawn(async move {
let mut stats_collector = Collector::new(rx, collector_tx);
stats_collector.collect(addresses).await;
});
// Connect to all servers and validate their versions.
let server_info = match pool.validate().await { let server_info = match pool.validate().await {
Ok(info) => info, Ok(info) => info,
Err(err) => { Err(err) => {

View File

@@ -38,6 +38,7 @@ impl ConnectionPool {
let mut shards = Vec::new(); let mut shards = Vec::new();
let mut addresses = Vec::new(); let mut addresses = Vec::new();
let mut banlist = Vec::new(); let mut banlist = Vec::new();
let mut address_id = 0;
let mut shard_ids = config let mut shard_ids = config
.shards .shards
.clone() .clone()
@@ -63,6 +64,7 @@ impl ConnectionPool {
}; };
let address = Address { let address = Address {
id: address_id,
host: server.0.clone(), host: server.0.clone(),
port: server.1.to_string(), port: server.1.to_string(),
role: role, role: role,
@@ -70,6 +72,8 @@ impl ConnectionPool {
shard: shard_idx.parse::<usize>().unwrap(), shard: shard_idx.parse::<usize>().unwrap(),
}; };
address_id += 1;
if role == Role::Replica { if role == Role::Replica {
replica_number += 1; replica_number += 1;
} }
@@ -121,9 +125,13 @@ impl ConnectionPool {
pub async fn validate(&mut self) -> Result<BytesMut, Error> { pub async fn validate(&mut self) -> Result<BytesMut, Error> {
let mut server_infos = Vec::new(); let mut server_infos = Vec::new();
let stats = self.stats.clone();
for shard in 0..self.shards() { for shard in 0..self.shards() {
for _ in 0..self.servers(shard) { for _ in 0..self.servers(shard) {
let connection = match self.get(shard, None).await { // To keep stats consistent.
let fake_process_id = 0;
let connection = match self.get(shard, None, fake_process_id).await {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
error!("Shard {} down or misconfigured: {:?}", shard, err); error!("Shard {} down or misconfigured: {:?}", shard, err);
@@ -137,6 +145,8 @@ impl ConnectionPool {
let server_info = server.server_info(); let server_info = server.server_info();
stats.client_disconnecting(fake_process_id, address.id);
if server_infos.len() > 0 { if server_infos.len() > 0 {
// Compare against the last server checked. // Compare against the last server checked.
if server_info != server_infos[server_infos.len() - 1] { if server_info != server_infos[server_infos.len() - 1] {
@@ -165,6 +175,7 @@ impl ConnectionPool {
&mut self, &mut self,
shard: usize, shard: usize,
role: Option<Role>, role: Option<Role>,
process_id: i32,
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let now = Instant::now(); let now = Instant::now();
let addresses = &self.addresses[shard]; let addresses = &self.addresses[shard];
@@ -200,6 +211,8 @@ impl ConnectionPool {
let index = self.round_robin % addresses.len(); let index = self.round_robin % addresses.len();
let address = &addresses[index]; let address = &addresses[index];
self.stats.client_waiting(process_id, address.id);
// Make sure you're getting a primary or a replica // Make sure you're getting a primary or a replica
// as per request. If no specific role is requested, the first // as per request. If no specific role is requested, the first
// available will be chosen. // available will be chosen.
@@ -219,6 +232,9 @@ impl ConnectionPool {
Err(err) => { Err(err) => {
error!("Banning replica {}, error: {:?}", index, err); error!("Banning replica {}, error: {:?}", index, err);
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}; };
@@ -227,7 +243,7 @@ impl ConnectionPool {
let server = &mut *conn; let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout; let healthcheck_timeout = get_config().general.healthcheck_timeout;
self.stats.server_tested(server.process_id()); self.stats.server_tested(server.process_id(), address.id);
match tokio::time::timeout( match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout), tokio::time::Duration::from_millis(healthcheck_timeout),
@@ -238,8 +254,9 @@ impl ConnectionPool {
// Check if health check succeeded // Check if health check succeeded
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats.checkout_time(now.elapsed().as_micros()); self.stats
self.stats.server_idle(conn.process_id()); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
Err(_) => { Err(_) => {
@@ -248,6 +265,9 @@ impl ConnectionPool {
server.mark_bad(); server.mark_bad();
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}, },
@@ -258,6 +278,9 @@ impl ConnectionPool {
server.mark_bad(); server.mark_bad();
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
} }
@@ -395,13 +418,14 @@ impl ManageConnection for ServerPool {
async fn connect(&self) -> Result<Self::Connection, Self::Error> { async fn connect(&self) -> Result<Self::Connection, Self::Error> {
info!( info!(
"Creating a new connection to {:?} using user {:?}", "Creating a new connection to {:?} using user {:?}",
self.address, self.user.name self.address.name(),
self.user.name
); );
// Put a temporary process_id into the stats // Put a temporary process_id into the stats
// for server login. // for server login.
let process_id = rand::random::<i32>(); let process_id = rand::random::<i32>();
self.stats.server_login(process_id); self.stats.server_login(process_id, self.address.id);
match Server::startup( match Server::startup(
&self.address, &self.address,
@@ -414,12 +438,12 @@ impl ManageConnection for ServerPool {
{ {
Ok(conn) => { Ok(conn) => {
// Remove the temporary process_id from the stats. // Remove the temporary process_id from the stats.
self.stats.server_disconnecting(process_id); self.stats.server_disconnecting(process_id, self.address.id);
Ok(conn) Ok(conn)
} }
Err(err) => { Err(err) => {
// Remove the temporary process_id from the stats. // Remove the temporary process_id from the stats.
self.stats.server_disconnecting(process_id); self.stats.server_disconnecting(process_id, self.address.id);
Err(err) Err(err)
} }
} }

View File

@@ -12,7 +12,7 @@ use sqlparser::parser::Parser;
const CUSTOM_SQL_REGEXES: [&str; 5] = [ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$", r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
r"(?i)^ *SHOW SHARD *;? *$", r"(?i)^ *SHOW SHARD *;? *$",
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$", r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
r"(?i)^ *SHOW SERVER ROLE *;? *$", r"(?i)^ *SHOW SERVER ROLE *;? *$",
@@ -192,7 +192,10 @@ impl QueryRouter {
} }
Command::SetShard => { Command::SetShard => {
self.active_shard = Some(value.parse::<usize>().unwrap()); self.active_shard = match value.to_ascii_uppercase().as_ref() {
"ANY" => Some(rand::random::<usize>() % self.shards),
_ => Some(value.parse::<usize>().unwrap()),
};
} }
Command::SetServerRole => { Command::SetServerRole => {

View File

@@ -268,7 +268,8 @@ impl Server {
/// Send messages to the server from the client. /// Send messages to the server from the client.
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
self.stats.data_sent(messages.len()); self.stats
.data_sent(messages.len(), self.process_id, self.address.id);
match write_all_half(&mut self.write, messages).await { match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
@@ -374,7 +375,8 @@ impl Server {
let bytes = self.buffer.clone(); let bytes = self.buffer.clone();
// Keep track of how much data we got from the server for stats. // Keep track of how much data we got from the server for stats.
self.stats.data_received(bytes.len()); self.stats
.data_received(bytes.len(), self.process_id, self.address.id);
// Clear the buffer for next query. // Clear the buffer for next query.
self.buffer.clear(); self.buffer.clear();
@@ -470,7 +472,8 @@ impl Drop for Server {
/// the socket is in non-blocking mode, so it may not be ready /// the socket is in non-blocking mode, so it may not be ready
/// for a write. /// for a write.
fn drop(&mut self) { fn drop(&mut self) {
self.stats.server_disconnecting(self.process_id()); self.stats
.server_disconnecting(self.process_id(), self.address.id);
let mut bytes = BytesMut::with_capacity(4); let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X'); bytes.put_u8(b'X');

View File

@@ -1,17 +1,20 @@
use log::{debug, info}; /// Statistics and reporting.
use log::info;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use statsd::Client; use std::collections::HashMap;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap; // Latest stats updated every second; used in SHOW STATS and other admin commands.
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
use crate::config::get_config; // Statistics period used for average calculations.
// 15 seconds.
// Stats used in SHOW STATS static STAT_PERIOD: u64 = 15000;
static LATEST_STATS: Lazy<Mutex<HashMap<String, i64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
static STAT_PERIOD: u64 = 15000; //15 seconds
/// The names for the events reported
/// to the statistics collector.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum EventName { enum EventName {
CheckoutTime, CheckoutTime,
@@ -28,186 +31,253 @@ enum EventName {
ServerTested, ServerTested,
ServerLogin, ServerLogin,
ServerDisconnecting, ServerDisconnecting,
FlushStatsToStatsD, UpdateStats,
UpdateAverages,
} }
/// Event data sent to the collector
/// from clients and servers.
#[derive(Debug)] #[derive(Debug)]
pub struct Event { pub struct Event {
/// The name of the event being reported.
name: EventName, name: EventName,
/// The value being reported. Meaning differs based on event name.
value: i64, value: i64,
process_id: Option<i32>,
/// The client or server connection reporting the event.
process_id: i32,
/// The server the client is connected to.
address_id: usize,
} }
/// The statistics reporter. An instance is given
/// to each possible source of statistics,
/// e.g. clients, servers, connection pool.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Reporter { pub struct Reporter {
tx: Sender<Event>, tx: Sender<Event>,
} }
impl Reporter { impl Reporter {
/// Create a new Reporter instance.
pub fn new(tx: Sender<Event>) -> Reporter { pub fn new(tx: Sender<Event>) -> Reporter {
Reporter { tx: tx } Reporter { tx: tx }
} }
pub fn query(&self) { /// Report a query executed by a client against
/// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::Query, name: EventName::Query,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn transaction(&self) { /// Report a transaction executed by a client against
/// a server identified by the `address_id`.
pub fn transaction(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::Transaction, name: EventName::Transaction,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn data_sent(&self, amount: usize) { /// Report data sent to a server identified by `address_id`.
/// The `amount` is measured in bytes.
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::DataSent, name: EventName::DataSent,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn data_received(&self, amount: usize) { /// Report data received from a server identified by `address_id`.
/// The `amount` is measured in bytes.
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::DataReceived, name: EventName::DataReceived,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn checkout_time(&self, ms: u128) { /// Time spent waiting to get a healthy connection from the pool
/// for a server identified by `address_id`.
/// Measured in milliseconds.
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::CheckoutTime, name: EventName::CheckoutTime,
value: ms as i64, value: ms as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn client_waiting(&self, process_id: i32) { /// Reports a client identified by `process_id` waiting for a connection
/// to a server identified by `address_id`.
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientWaiting, name: EventName::ClientWaiting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn client_active(&self, process_id: i32) { /// Reports a client identified by `process_id` is done waiting for a connection
/// to a server identified by `address_id` and is about to query the server.
pub fn client_active(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientActive, name: EventName::ClientActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn client_idle(&self, process_id: i32) { /// Reports a client identified by `process_id` is done querying the server
/// identified by `address_id` and is no longer active.
pub fn client_idle(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientIdle, name: EventName::ClientIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn client_disconnecting(&self, process_id: i32) { /// Reports a client identified by `process_id` is disconecting from the pooler.
/// The last server it was connected to is identified by `address_id`.
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ClientDisconnecting, name: EventName::ClientDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn server_active(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is actively used
/// by a client.
pub fn server_active(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerActive, name: EventName::ServerActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn server_idle(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is no longer
/// actively used by a client and is now idle.
pub fn server_idle(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerIdle, name: EventName::ServerIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn server_login(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is attempting
/// to login.
pub fn server_login(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerLogin, name: EventName::ServerLogin,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn server_tested(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
/// a configured server identified by `address_id` is being
/// tested before being given to a client.
pub fn server_tested(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerTested, name: EventName::ServerTested,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
pub fn server_disconnecting(&self, process_id: i32) { /// Reports a server connection identified by `process_id` is disconecting from the pooler.
/// The configured server it was connected to is identified by `address_id`.
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event { let event = Event {
name: EventName::ServerDisconnecting, name: EventName::ServerDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(event); let _ = self.tx.try_send(event);
} }
} }
/// The statistics collector which is receiving statistics
/// from clients, servers, and the connection pool. There is
/// only one collector (kind of like a singleton).
/// The collector can trigger events on its own, e.g.
/// it updates aggregates every second and averages every
/// 15 seconds.
pub struct Collector { pub struct Collector {
rx: Receiver<Event>, rx: Receiver<Event>,
tx: Sender<Event>, tx: Sender<Event>,
client: Client,
} }
impl Collector { impl Collector {
/// Create a new collector instance. There should only be one instance
/// at a time. This is ensured by mpsc which allows only one receiver.
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector { pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
Collector { Collector { rx, tx }
rx,
tx,
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
}
} }
pub async fn collect(&mut self) { /// The statistics collection handler. It will collect statistics
/// for `address_id`s starting at 0 up to `addresses`.
pub async fn collect(&mut self, addresses: usize) {
info!("Events reporter started"); info!("Events reporter started");
let mut stats = HashMap::from([ let stats_template = HashMap::from([
("total_query_count", 0), ("total_query_count", 0),
("total_xact_count", 0), ("total_xact_count", 0),
("total_sent", 0), ("total_sent", 0),
@@ -232,25 +302,47 @@ impl Collector {
("sv_tested", 0), ("sv_tested", 0),
]); ]);
let mut stats = HashMap::new();
// Stats saved after each iteration of the flush event. Used in calculation // Stats saved after each iteration of the flush event. Used in calculation
// of averages in the last flush period. // of averages in the last flush period.
let mut old_stats: HashMap<String, i64> = HashMap::new(); let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
// Track which state the client and server are at any given time. // Track which state the client and server are at any given time.
let mut client_server_states: HashMap<i32, EventName> = HashMap::new(); let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
// Flush stats to StatsD and calculate averages every 15 seconds. // Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
loop {
interval.tick().await;
for address_id in 0..addresses {
let _ = tx.try_send(Event {
name: EventName::UpdateStats,
value: 0,
process_id: -1,
address_id: address_id,
});
}
}
});
let tx = self.tx.clone(); let tx = self.tx.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
let mut interval = let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
loop { loop {
interval.tick().await; interval.tick().await;
let _ = tx.try_send(Event { for address_id in 0..addresses {
name: EventName::FlushStatsToStatsD, let _ = tx.try_send(Event {
value: 0, name: EventName::UpdateAverages,
process_id: None, value: 0,
}); process_id: -1,
address_id: address_id,
});
}
} }
}); });
@@ -264,6 +356,14 @@ impl Collector {
} }
}; };
let stats = stats
.entry(stat.address_id)
.or_insert(stats_template.clone());
let client_server_states = client_server_states
.entry(stat.address_id)
.or_insert(HashMap::new());
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
// Some are counters, some are gauges... // Some are counters, some are gauges...
match stat.name { match stat.name {
EventName::Query => { EventName::Query => {
@@ -313,16 +413,16 @@ impl Collector {
| EventName::ServerIdle | EventName::ServerIdle
| EventName::ServerTested | EventName::ServerTested
| EventName::ServerLogin => { | EventName::ServerLogin => {
client_server_states.insert(stat.process_id.unwrap(), stat.name); client_server_states.insert(stat.process_id, stat.name);
} }
EventName::ClientDisconnecting | EventName::ServerDisconnecting => { EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
client_server_states.remove(&stat.process_id.unwrap()); client_server_states.remove(&stat.process_id);
} }
EventName::FlushStatsToStatsD => { EventName::UpdateStats => {
// Calculate connection states // Calculate connection states
for (_, state) in &client_server_states { for (_, state) in client_server_states.iter() {
match state { match state {
EventName::ClientActive => { EventName::ClientActive => {
let counter = stats.entry("cl_active").or_insert(0); let counter = stats.entry("cl_active").or_insert(0);
@@ -334,11 +434,6 @@ impl Collector {
*counter += 1; *counter += 1;
} }
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
EventName::ServerIdle => { EventName::ServerIdle => {
let counter = stats.entry("sv_idle").or_insert(0); let counter = stats.entry("sv_idle").or_insert(0);
*counter += 1; *counter += 1;
@@ -359,39 +454,20 @@ impl Collector {
*counter += 1; *counter += 1;
} }
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
_ => unreachable!(), _ => unreachable!(),
}; };
} }
// Calculate averages
for stat in &[
"avg_query_count",
"avgxact_count",
"avg_sent",
"avg_received",
"avg_wait_time",
] {
let total_name = stat.replace("avg_", "total_");
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;
}
debug!("{:?}", stats);
// Update latest stats used in SHOW STATS // Update latest stats used in SHOW STATS
let mut guard = LATEST_STATS.lock(); let mut guard = LATEST_STATS.lock();
for (key, value) in &stats {
guard.insert(key.to_string(), value.clone());
}
let mut pipeline = self.client.pipeline();
for (key, value) in stats.iter() { for (key, value) in stats.iter() {
pipeline.gauge(key, *value as f64); let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
entry.insert(key.to_string(), value.clone());
} }
// These are re-calculated every iteration of the loop, so we don't want to add values // These are re-calculated every iteration of the loop, so we don't want to add values
@@ -409,14 +485,33 @@ impl Collector {
] { ] {
stats.insert(stat, 0); stats.insert(stat, 0);
} }
}
pipeline.send(&self.client); EventName::UpdateAverages => {
// Calculate averages
for stat in &[
"avg_query_count",
"avgxact_count",
"avg_sent",
"avg_received",
"avg_wait_time",
] {
let total_name = stat.replace("avg_", "total_");
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;
}
} }
}; };
} }
} }
} }
pub fn get_stats() -> HashMap<String, i64> { /// Get a snapshot of statistics. Updated once a second
/// by the `Collector`.
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
LATEST_STATS.lock().clone() LATEST_STATS.lock().clone()
} }

28
tests/pgbench/simple.sql Normal file
View File

@@ -0,0 +1,28 @@
-- \setrandom aid 1 :naccounts
\set aid random(1, 100000)
-- \setrandom bid 1 :nbranches
\set bid random(1, 100000)
-- \setrandom tid 1 :ntellers
\set tid random(1, 100000)
-- \setrandom delta -5000 5000
\set delta random(-5000,5000)
\set shard random(0, 2)
SET SHARD TO :shard;
BEGIN;
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
END;