mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
12 Commits
query-rout
...
v0.1.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
341ebf4123 | ||
|
|
35828a0a8c | ||
|
|
1e8fa110ae | ||
|
|
d4186b7815 | ||
|
|
aaeef69d59 | ||
|
|
b21e0f4a7e | ||
|
|
eb1473060e | ||
|
|
26f75f8d5d | ||
|
|
99d65fc475 | ||
|
|
206fdc9769 | ||
|
|
f74101cdfe | ||
|
|
8e0682482d |
@@ -13,6 +13,9 @@ function start_pgcat() {
|
||||
|
||||
# Setup the database with shards and user
|
||||
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||
|
||||
# Install Toxiproxy to simulate a downed/slow database
|
||||
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
||||
@@ -28,9 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
|
||||
start_pgcat "info"
|
||||
|
||||
# pgbench test
|
||||
pgbench -i -h 127.0.0.1 -p 6432 && \
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||
pgbench -i -h 127.0.0.1 -p 6432
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||
|
||||
# COPY TO STDOUT test
|
||||
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
||||
@@ -57,6 +60,17 @@ cd tests/ruby && \
|
||||
ruby tests.rb && \
|
||||
cd ../..
|
||||
|
||||
# Admin tests
|
||||
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||
|
||||
# Start PgCat in debug to demonstrate failover better
|
||||
start_pgcat "debug"
|
||||
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -371,7 +371,6 @@ dependencies = [
|
||||
"serde_derive",
|
||||
"sha-1",
|
||||
"sqlparser",
|
||||
"statsd",
|
||||
"tokio",
|
||||
"toml",
|
||||
]
|
||||
@@ -542,15 +541,6 @@ dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "statsd"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330"
|
||||
dependencies = [
|
||||
"rand",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.86"
|
||||
|
||||
@@ -20,7 +20,6 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
statsd = "0.15"
|
||||
sqlparser = "0.14"
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
|
||||
386
src/admin.rs
Normal file
386
src/admin.rs
Normal file
@@ -0,0 +1,386 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{info, trace};
|
||||
use tokio::net::tcp::OwnedWriteHalf;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::config::{get_config, parse};
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::ConnectionPool;
|
||||
use crate::stats::get_stats;
|
||||
|
||||
/// Handle admin client
|
||||
pub async fn handle_admin(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
mut query: BytesMut,
|
||||
pool: ConnectionPool,
|
||||
) -> Result<(), Error> {
|
||||
let code = query.get_u8() as char;
|
||||
|
||||
if code != 'Q' {
|
||||
return Err(Error::ProtocolSyncError);
|
||||
}
|
||||
|
||||
let len = query.get_i32() as usize;
|
||||
let query = String::from_utf8_lossy(&query[..len - 5])
|
||||
.to_string()
|
||||
.to_ascii_uppercase();
|
||||
|
||||
trace!("Admin query: {}", query);
|
||||
|
||||
if query.starts_with("SHOW STATS") {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream, &pool).await
|
||||
} else if query.starts_with("RELOAD") {
|
||||
trace!("RELOAD");
|
||||
reload(stream).await
|
||||
} else if query.starts_with("SHOW CONFIG") {
|
||||
trace!("SHOW CONFIG");
|
||||
show_config(stream).await
|
||||
} else if query.starts_with("SHOW DATABASES") {
|
||||
trace!("SHOW DATABASES");
|
||||
show_databases(stream, &pool).await
|
||||
} else if query.starts_with("SHOW POOLS") {
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream, &pool).await
|
||||
} else if query.starts_with("SHOW LISTS") {
|
||||
trace!("SHOW LISTS");
|
||||
show_lists(stream, &pool).await
|
||||
} else if query.starts_with("SHOW VERSION") {
|
||||
trace!("SHOW VERSION");
|
||||
show_version(stream).await
|
||||
} else if query.starts_with("SET ") {
|
||||
trace!("SET");
|
||||
ignore_set(stream).await
|
||||
} else {
|
||||
error_response(stream, "Unsupported query against the admin database").await
|
||||
}
|
||||
}
|
||||
|
||||
/// SHOW LISTS
|
||||
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let stats = get_stats();
|
||||
|
||||
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
res.put(data_row(&vec![
|
||||
"databases".to_string(),
|
||||
(pool.databases() + 1).to_string(), // see comment below
|
||||
]));
|
||||
res.put(data_row(&vec!["users".to_string(), "1".to_string()]));
|
||||
res.put(data_row(&vec![
|
||||
"pools".to_string(),
|
||||
(pool.databases() + 1).to_string(), // +1 for the pgbouncer admin db pool which isn't real
|
||||
])); // but admin tools that work with pgbouncer want this
|
||||
res.put(data_row(&vec![
|
||||
"free_clients".to_string(),
|
||||
stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_idle"])
|
||||
.sum::<i64>()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_clients".to_string(),
|
||||
stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_active"])
|
||||
.sum::<i64>()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"login_clients".to_string(),
|
||||
"0".to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"free_servers".to_string(),
|
||||
stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_idle"])
|
||||
.sum::<i64>()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_servers".to_string(),
|
||||
stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_active"])
|
||||
.sum::<i64>()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()]));
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW VERSION
|
||||
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
res.put(row_description(&vec![("version", DataType::Text)]));
|
||||
res.put(data_row(&vec!["PgCat 0.1.0".to_string()]));
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW POOLS
|
||||
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let stats = get_stats();
|
||||
let config = {
|
||||
let guard = get_config();
|
||||
&*guard.clone()
|
||||
};
|
||||
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("cl_active", DataType::Numeric),
|
||||
("cl_waiting", DataType::Numeric),
|
||||
("cl_cancel_req", DataType::Numeric),
|
||||
("sv_active", DataType::Numeric),
|
||||
("sv_idle", DataType::Numeric),
|
||||
("sv_used", DataType::Numeric),
|
||||
("sv_tested", DataType::Numeric),
|
||||
("sv_login", DataType::Numeric),
|
||||
("maxwait", DataType::Numeric),
|
||||
("maxwait_us", DataType::Numeric),
|
||||
("pool_mode", DataType::Text),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name(), config.user.name.clone()];
|
||||
|
||||
for column in &columns[2..columns.len() - 1] {
|
||||
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||
row.push(value);
|
||||
}
|
||||
|
||||
row.push(config.general.pool_mode.to_string());
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW DATABASES
|
||||
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let guard = get_config();
|
||||
let config = &*guard.clone();
|
||||
drop(guard);
|
||||
|
||||
// Columns
|
||||
let columns = vec![
|
||||
("name", DataType::Text),
|
||||
("host", DataType::Text),
|
||||
("port", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("force_user", DataType::Text),
|
||||
("pool_size", DataType::Int4),
|
||||
("min_pool_size", DataType::Int4),
|
||||
("reserve_pool", DataType::Int4),
|
||||
("pool_mode", DataType::Text),
|
||||
("max_connections", DataType::Int4),
|
||||
("current_connections", DataType::Int4),
|
||||
("paused", DataType::Int4),
|
||||
("disabled", DataType::Int4),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// RowDescription
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for shard in 0..pool.shards() {
|
||||
let database_name = &config.shards[&shard.to_string()].database;
|
||||
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
|
||||
res.put(data_row(&vec![
|
||||
address.name(), // name
|
||||
address.host.to_string(), // host
|
||||
address.port.to_string(), // port
|
||||
database_name.to_string(), // database
|
||||
config.user.name.to_string(), // force_user
|
||||
config.general.pool_size.to_string(), // pool_size
|
||||
"0".to_string(), // min_pool_size
|
||||
"0".to_string(), // reserve_pool
|
||||
config.general.pool_mode.to_string(), // pool_mode
|
||||
config.general.pool_size.to_string(), // max_connections
|
||||
pool_state.connections.to_string(), // current_connections
|
||||
"0".to_string(), // paused
|
||||
"0".to_string(), // disabled
|
||||
]));
|
||||
}
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Ignore any SET commands the client sends.
|
||||
/// This is common initialization done by ORMs.
|
||||
async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
custom_protocol_response_ok(stream, "SET").await
|
||||
}
|
||||
|
||||
/// RELOAD
|
||||
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
info!("Reloading config");
|
||||
|
||||
let config = get_config();
|
||||
let path = config.path.clone().unwrap();
|
||||
|
||||
parse(&path).await?;
|
||||
|
||||
let config = get_config();
|
||||
|
||||
config.show();
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// CommandComplete
|
||||
res.put(command_complete("RELOAD"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
let guard = get_config();
|
||||
let config = &*guard.clone();
|
||||
let config: HashMap<String, String> = config.into();
|
||||
drop(guard);
|
||||
|
||||
// Configs that cannot be changed dynamically.
|
||||
let immutables = ["host", "port", "connect_timeout"];
|
||||
|
||||
// Columns
|
||||
let columns = vec![
|
||||
("key", DataType::Text),
|
||||
("value", DataType::Text),
|
||||
("default", DataType::Text),
|
||||
("changeable", DataType::Text),
|
||||
];
|
||||
|
||||
// Response data
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
// DataRow rows
|
||||
for (key, value) in config {
|
||||
let changeable = if immutables.iter().filter(|col| *col == &key).count() == 1 {
|
||||
"no".to_string()
|
||||
} else {
|
||||
"yes".to_string()
|
||||
};
|
||||
|
||||
let row = vec![key, value, "-".to_string(), changeable];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW STATS
|
||||
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("total_xact_count", DataType::Numeric),
|
||||
("total_query_count", DataType::Numeric),
|
||||
("total_received", DataType::Numeric),
|
||||
("total_sent", DataType::Numeric),
|
||||
("total_xact_time", DataType::Numeric),
|
||||
("total_query_time", DataType::Numeric),
|
||||
("total_wait_time", DataType::Numeric),
|
||||
("avg_xact_count", DataType::Numeric),
|
||||
("avg_query_count", DataType::Numeric),
|
||||
("avg_recv", DataType::Numeric),
|
||||
("avg_sent", DataType::Numeric),
|
||||
("avg_xact_time", DataType::Numeric),
|
||||
("avg_query_time", DataType::Numeric),
|
||||
("avg_wait_time", DataType::Numeric),
|
||||
];
|
||||
|
||||
let stats = get_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name()];
|
||||
|
||||
for column in &columns[1..] {
|
||||
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||
}
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
@@ -11,6 +11,7 @@ use tokio::net::{
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::admin::handle_admin;
|
||||
use crate::config::get_config;
|
||||
use crate::constants::*;
|
||||
use crate::errors::Error;
|
||||
@@ -54,6 +55,15 @@ pub struct Client {
|
||||
|
||||
// Statistics
|
||||
stats: Reporter,
|
||||
|
||||
// Clients want to talk to admin
|
||||
admin: bool,
|
||||
|
||||
// Last address the client talked to
|
||||
last_address_id: Option<usize>,
|
||||
|
||||
// Last server process id we talked to
|
||||
last_server_id: Option<i32>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
@@ -118,6 +128,15 @@ impl Client {
|
||||
ready_for_query(&mut stream).await?;
|
||||
trace!("Startup OK");
|
||||
|
||||
let database = parameters
|
||||
.get("database")
|
||||
.unwrap_or(parameters.get("user").unwrap());
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == &database)
|
||||
.count()
|
||||
== 1;
|
||||
|
||||
// Split the read and write streams
|
||||
// so we can control buffering.
|
||||
let (read, write) = stream.into_split();
|
||||
@@ -133,6 +152,9 @@ impl Client {
|
||||
client_server_map: client_server_map,
|
||||
parameters: parameters,
|
||||
stats: stats,
|
||||
admin: admin,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -154,6 +176,9 @@ impl Client {
|
||||
client_server_map: client_server_map,
|
||||
parameters: HashMap::new(),
|
||||
stats: stats,
|
||||
admin: false,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -204,9 +229,6 @@ impl Client {
|
||||
loop {
|
||||
trace!("Client idle, waiting for message");
|
||||
|
||||
// Client idle, waiting for messages.
|
||||
self.stats.client_idle(self.process_id);
|
||||
|
||||
// Read a complete message from the client, which normally would be
|
||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||
// We can parse it here before grabbing a server from the pool,
|
||||
@@ -220,6 +242,13 @@ impl Client {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Handle admin database real quick
|
||||
if self.admin {
|
||||
trace!("Handling admin command");
|
||||
handle_admin(&mut self.write, message, pool.clone()).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle all custom protocol commands here.
|
||||
match query_router.try_execute_command(message.clone()) {
|
||||
// Normal query
|
||||
@@ -270,13 +299,13 @@ impl Client {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Waiting for server connection.
|
||||
self.stats.client_waiting(self.process_id);
|
||||
|
||||
debug!("Waiting for connection from pool");
|
||||
|
||||
// Grab a server from the pool: the client issued a regular query.
|
||||
let connection = match pool.get(query_router.shard(), query_router.role()).await {
|
||||
let connection = match pool
|
||||
.get(query_router.shard(), query_router.role(), self.process_id)
|
||||
.await
|
||||
{
|
||||
Ok(conn) => {
|
||||
debug!("Got connection from pool");
|
||||
conn
|
||||
@@ -290,15 +319,23 @@ impl Client {
|
||||
};
|
||||
|
||||
let mut reference = connection.0;
|
||||
let _address = connection.1;
|
||||
let address = connection.1;
|
||||
let server = &mut *reference;
|
||||
|
||||
// Claim this server as mine for query cancellation.
|
||||
server.claim(self.process_id, self.secret_key);
|
||||
|
||||
// "disconnect" from the previous server stats-wise
|
||||
if let Some(last_address_id) = self.last_address_id {
|
||||
self.stats
|
||||
.client_disconnecting(self.process_id, last_address_id);
|
||||
}
|
||||
|
||||
// Client active & server active
|
||||
self.stats.client_active(self.process_id);
|
||||
self.stats.server_active(server.process_id());
|
||||
self.stats.client_active(self.process_id, address.id);
|
||||
self.stats.server_active(server.process_id(), address.id);
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.process_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
@@ -370,17 +407,17 @@ impl Client {
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query();
|
||||
self.stats.query(self.process_id, address.id);
|
||||
|
||||
// The transaction is over, we can release the connection back to the pool.
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction();
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id());
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -456,15 +493,15 @@ impl Client {
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query();
|
||||
self.stats.query(self.process_id, address.id);
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction();
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id());
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -495,10 +532,10 @@ impl Client {
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction();
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id());
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -515,6 +552,7 @@ impl Client {
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id, address.id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,6 +565,14 @@ impl Client {
|
||||
|
||||
impl Drop for Client {
|
||||
fn drop(&mut self) {
|
||||
self.stats.client_disconnecting(self.process_id);
|
||||
// Disconnect the client
|
||||
if let Some(address_id) = self.last_address_id {
|
||||
self.stats.client_disconnecting(self.process_id, address_id);
|
||||
|
||||
// The server is now idle
|
||||
if let Some(process_id) = self.last_server_id {
|
||||
self.stats.server_idle(process_id, address_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,15 @@ pub enum Role {
|
||||
Replica,
|
||||
}
|
||||
|
||||
impl ToString for Role {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
Role::Primary => "primary".to_string(),
|
||||
Role::Replica => "replica".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<Option<Role>> for Role {
|
||||
fn eq(&self, other: &Option<Role>) -> bool {
|
||||
match other {
|
||||
@@ -39,23 +48,37 @@ impl PartialEq<Role> for Option<Role> {
|
||||
|
||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
||||
pub struct Address {
|
||||
pub id: usize,
|
||||
pub host: String,
|
||||
pub port: String,
|
||||
pub shard: usize,
|
||||
pub role: Role,
|
||||
pub replica_number: usize,
|
||||
}
|
||||
|
||||
impl Default for Address {
|
||||
fn default() -> Address {
|
||||
Address {
|
||||
id: 0,
|
||||
host: String::from("127.0.0.1"),
|
||||
port: String::from("5432"),
|
||||
shard: 0,
|
||||
replica_number: 0,
|
||||
role: Role::Replica,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Address {
|
||||
pub fn name(&self) -> String {
|
||||
match self.role {
|
||||
Role::Primary => format!("shard_{}_primary", self.shard),
|
||||
|
||||
Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
|
||||
pub struct User {
|
||||
pub name: String,
|
||||
@@ -134,6 +157,7 @@ impl Default for QueryRouter {
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub path: Option<String>,
|
||||
pub general: General,
|
||||
pub user: User,
|
||||
pub shards: HashMap<String, Shard>,
|
||||
@@ -143,6 +167,7 @@ pub struct Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Config {
|
||||
Config {
|
||||
path: Some(String::from("pgcat.toml")),
|
||||
general: General::default(),
|
||||
user: User::default(),
|
||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||
@@ -151,6 +176,52 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
fn from(config: &Config) -> HashMap<String, String> {
|
||||
HashMap::from([
|
||||
("host".to_string(), config.general.host.to_string()),
|
||||
("port".to_string(), config.general.port.to_string()),
|
||||
(
|
||||
"pool_size".to_string(),
|
||||
config.general.pool_size.to_string(),
|
||||
),
|
||||
(
|
||||
"pool_mode".to_string(),
|
||||
config.general.pool_mode.to_string(),
|
||||
),
|
||||
(
|
||||
"connect_timeout".to_string(),
|
||||
config.general.connect_timeout.to_string(),
|
||||
),
|
||||
(
|
||||
"healthcheck_timeout".to_string(),
|
||||
config.general.healthcheck_timeout.to_string(),
|
||||
),
|
||||
("ban_time".to_string(), config.general.ban_time.to_string()),
|
||||
(
|
||||
"statsd_address".to_string(),
|
||||
config.general.statsd_address.to_string(),
|
||||
),
|
||||
(
|
||||
"default_role".to_string(),
|
||||
config.query_router.default_role.to_string(),
|
||||
),
|
||||
(
|
||||
"query_parser_enabled".to_string(),
|
||||
config.query_router.query_parser_enabled.to_string(),
|
||||
),
|
||||
(
|
||||
"primary_reads_enabled".to_string(),
|
||||
config.query_router.primary_reads_enabled.to_string(),
|
||||
),
|
||||
(
|
||||
"sharding_function".to_string(),
|
||||
config.query_router.sharding_function.to_string(),
|
||||
),
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn show(&self) {
|
||||
info!("Pool size: {}", self.general.pool_size);
|
||||
@@ -189,7 +260,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
let config: Config = match toml::from_str(&contents) {
|
||||
let mut config: Config = match toml::from_str(&contents) {
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
error!("Could not parse config file: {}", err.to_string());
|
||||
@@ -279,6 +350,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
config.path = Some(path.to_string());
|
||||
|
||||
CONFIG.store(Arc::new(config.clone()));
|
||||
|
||||
Ok(())
|
||||
@@ -296,5 +369,6 @@ mod test {
|
||||
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
|
||||
assert_eq!(get_config().shards["0"].servers[0].2, "primary");
|
||||
assert_eq!(get_config().query_router.default_role, "any");
|
||||
assert_eq!(get_config().path, Some("pgcat.toml".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,3 +20,8 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
|
||||
|
||||
// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
|
||||
pub const MESSAGE_TERMINATOR: u8 = 0;
|
||||
|
||||
//
|
||||
// Data types
|
||||
//
|
||||
pub const _OID_INT8: i32 = 20; // bigint
|
||||
|
||||
16
src/main.rs
16
src/main.rs
@@ -31,7 +31,6 @@ extern crate once_cell;
|
||||
extern crate serde;
|
||||
extern crate serde_derive;
|
||||
extern crate sqlparser;
|
||||
extern crate statsd;
|
||||
extern crate tokio;
|
||||
extern crate toml;
|
||||
|
||||
@@ -47,6 +46,7 @@ use tokio::{
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod admin;
|
||||
mod client;
|
||||
mod config;
|
||||
mod constants;
|
||||
@@ -112,15 +112,19 @@ async fn main() {
|
||||
|
||||
// Collect statistics and send them to StatsD
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let collector_tx = tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||
stats_collector.collect().await;
|
||||
});
|
||||
|
||||
// Connection pool for all shards and replicas
|
||||
let mut pool =
|
||||
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
|
||||
|
||||
let collector_tx = tx.clone();
|
||||
let addresses = pool.databases();
|
||||
tokio::task::spawn(async move {
|
||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||
stats_collector.collect(addresses).await;
|
||||
});
|
||||
|
||||
// Connect to all servers and validate their versions.
|
||||
let server_info = match pool.validate().await {
|
||||
Ok(info) => info,
|
||||
Err(err) => {
|
||||
|
||||
162
src/messages.rs
162
src/messages.rs
@@ -8,9 +8,26 @@ use tokio::net::{
|
||||
TcpStream,
|
||||
};
|
||||
|
||||
use crate::errors::Error;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::errors::Error;
|
||||
/// Postgres data type mappings
|
||||
/// used in RowDescription ('T') message.
|
||||
pub enum DataType {
|
||||
Text,
|
||||
Int4,
|
||||
Numeric,
|
||||
}
|
||||
|
||||
impl From<&DataType> for i32 {
|
||||
fn from(data_type: &DataType) -> i32 {
|
||||
match data_type {
|
||||
DataType::Text => 25,
|
||||
DataType::Int4 => 23,
|
||||
DataType::Numeric => 1700,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tell the client that authentication handshake completed successfully.
|
||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
@@ -91,9 +108,8 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse StartupMessage parameters.
|
||||
/// e.g. user, database, application_name, etc.
|
||||
pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
/// Parse the params the server sends as a key/value format.
|
||||
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
let mut result = HashMap::new();
|
||||
let mut buf = Vec::new();
|
||||
let mut tmp = String::new();
|
||||
@@ -115,7 +131,7 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
||||
|
||||
// Expect pairs of name and value
|
||||
// and at least one pair to be present.
|
||||
if buf.len() % 2 != 0 && buf.len() >= 2 {
|
||||
if buf.len() % 2 != 0 || buf.len() < 2 {
|
||||
return Err(Error::ClientBadStartup);
|
||||
}
|
||||
|
||||
@@ -127,6 +143,14 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
||||
i += 2;
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Parse StartupMessage parameters.
|
||||
/// e.g. user, database, application_name, etc.
|
||||
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
let result = parse_params(bytes)?;
|
||||
|
||||
// Minimum required parameters
|
||||
// I want to have the user at the very minimum, according to the protocol spec.
|
||||
if !result.contains_key("user") {
|
||||
@@ -252,68 +276,17 @@ pub async fn show_response(
|
||||
// 3. CommandComplete
|
||||
// 4. ReadyForQuery
|
||||
|
||||
// RowDescription
|
||||
let mut row_desc = BytesMut::new();
|
||||
|
||||
// Number of columns: 1
|
||||
row_desc.put_i16(1);
|
||||
|
||||
// Column name
|
||||
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// Text
|
||||
row_desc.put_i32(25);
|
||||
|
||||
// Text size = variable (-1)
|
||||
row_desc.put_i16(-1);
|
||||
|
||||
// Type modifier: none that I know
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Format being used: text (0), binary (1)
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// DataRow
|
||||
let mut data_row = BytesMut::new();
|
||||
|
||||
// Number of columns
|
||||
data_row.put_i16(1);
|
||||
|
||||
// Size of the column content (length of the string really)
|
||||
data_row.put_i32(value.len() as i32);
|
||||
|
||||
// The content
|
||||
data_row.put_slice(value.as_bytes());
|
||||
|
||||
// CommandComplete
|
||||
let mut command_complete = BytesMut::new();
|
||||
|
||||
// Number of rows returned (just one)
|
||||
command_complete.put_slice(&b"SELECT 1\0"[..]);
|
||||
|
||||
// The final messages sent to the client
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// RowDescription
|
||||
res.put_u8(b'T');
|
||||
res.put_i32(row_desc.len() as i32 + 4);
|
||||
res.put(row_desc);
|
||||
res.put(row_description(&vec![(name, DataType::Text)]));
|
||||
|
||||
// DataRow
|
||||
res.put_u8(b'D');
|
||||
res.put_i32(data_row.len() as i32 + 4);
|
||||
res.put(data_row);
|
||||
res.put(data_row(&vec![value.to_string()]));
|
||||
|
||||
// CommandComplete
|
||||
res.put_u8(b'C');
|
||||
res.put_i32(command_complete.len() as i32 + 4);
|
||||
res.put(command_complete);
|
||||
res.put(command_complete("SELECT 1"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
@@ -323,6 +296,77 @@ pub async fn show_response(
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
||||
let mut res = BytesMut::new();
|
||||
let mut row_desc = BytesMut::new();
|
||||
|
||||
// how many colums we are storing
|
||||
row_desc.put_i16(columns.len() as i16);
|
||||
|
||||
for (name, data_type) in columns {
|
||||
// Column name
|
||||
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// Text
|
||||
row_desc.put_i32(data_type.into());
|
||||
|
||||
// Text size = variable (-1)
|
||||
let type_size = match data_type {
|
||||
DataType::Text => -1,
|
||||
DataType::Int4 => 4,
|
||||
DataType::Numeric => -1,
|
||||
};
|
||||
|
||||
row_desc.put_i16(type_size);
|
||||
|
||||
// Type modifier: none that I know
|
||||
row_desc.put_i32(-1);
|
||||
|
||||
// Format being used: text (0), binary (1)
|
||||
row_desc.put_i16(0);
|
||||
}
|
||||
|
||||
res.put_u8(b'T');
|
||||
res.put_i32(row_desc.len() as i32 + 4);
|
||||
res.put(row_desc);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn data_row(row: &Vec<String>) -> BytesMut {
|
||||
let mut res = BytesMut::new();
|
||||
let mut data_row = BytesMut::new();
|
||||
|
||||
data_row.put_i16(row.len() as i16);
|
||||
|
||||
for column in row {
|
||||
let column = column.as_bytes();
|
||||
data_row.put_i32(column.len() as i32);
|
||||
data_row.put_slice(&column);
|
||||
}
|
||||
|
||||
res.put_u8(b'D');
|
||||
res.put_i32(data_row.len() as i32 + 4);
|
||||
res.put(data_row);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn command_complete(command: &str) -> BytesMut {
|
||||
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
||||
let mut res = BytesMut::new();
|
||||
res.put_u8(b'C');
|
||||
res.put_i32(cmd.len() as i32 + 4);
|
||||
res.put(cmd);
|
||||
res
|
||||
}
|
||||
|
||||
/// Write all data in the buffer to the TcpStream.
|
||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||
match stream.write_all(&buf).await {
|
||||
|
||||
84
src/pool.rs
84
src/pool.rs
@@ -38,6 +38,7 @@ impl ConnectionPool {
|
||||
let mut shards = Vec::new();
|
||||
let mut addresses = Vec::new();
|
||||
let mut banlist = Vec::new();
|
||||
let mut address_id = 0;
|
||||
let mut shard_ids = config
|
||||
.shards
|
||||
.clone()
|
||||
@@ -49,7 +50,8 @@ impl ConnectionPool {
|
||||
for shard_idx in shard_ids {
|
||||
let shard = &config.shards[&shard_idx];
|
||||
let mut pools = Vec::new();
|
||||
let mut replica_addresses = Vec::new();
|
||||
let mut servers = Vec::new();
|
||||
let mut replica_number = 0;
|
||||
|
||||
for server in shard.servers.iter() {
|
||||
let role = match server.2.as_ref() {
|
||||
@@ -62,12 +64,20 @@ impl ConnectionPool {
|
||||
};
|
||||
|
||||
let address = Address {
|
||||
id: address_id,
|
||||
host: server.0.clone(),
|
||||
port: server.1.to_string(),
|
||||
role: role,
|
||||
replica_number,
|
||||
shard: shard_idx.parse::<usize>().unwrap(),
|
||||
};
|
||||
|
||||
address_id += 1;
|
||||
|
||||
if role == Role::Replica {
|
||||
replica_number += 1;
|
||||
}
|
||||
|
||||
let manager = ServerPool::new(
|
||||
address.clone(),
|
||||
config.user.clone(),
|
||||
@@ -87,11 +97,11 @@ impl ConnectionPool {
|
||||
.unwrap();
|
||||
|
||||
pools.push(pool);
|
||||
replica_addresses.push(address);
|
||||
servers.push(address);
|
||||
}
|
||||
|
||||
shards.push(pools);
|
||||
addresses.push(replica_addresses);
|
||||
addresses.push(servers);
|
||||
banlist.push(HashMap::new());
|
||||
}
|
||||
|
||||
@@ -115,9 +125,13 @@ impl ConnectionPool {
|
||||
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
||||
let mut server_infos = Vec::new();
|
||||
|
||||
let stats = self.stats.clone();
|
||||
for shard in 0..self.shards() {
|
||||
for _ in 0..self.servers(shard) {
|
||||
let connection = match self.get(shard, None).await {
|
||||
// To keep stats consistent.
|
||||
let fake_process_id = 0;
|
||||
|
||||
let connection = match self.get(shard, None, fake_process_id).await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
||||
@@ -126,10 +140,24 @@ impl ConnectionPool {
|
||||
};
|
||||
|
||||
let mut proxy = connection.0;
|
||||
let _address = connection.1;
|
||||
let address = connection.1;
|
||||
let server = &mut *proxy;
|
||||
|
||||
server_infos.push(server.server_info());
|
||||
let server_info = server.server_info();
|
||||
|
||||
stats.client_disconnecting(fake_process_id, address.id);
|
||||
|
||||
if server_infos.len() > 0 {
|
||||
// Compare against the last server checked.
|
||||
if server_info != server_infos[server_infos.len() - 1] {
|
||||
warn!(
|
||||
"{:?} has different server configuration than the last server",
|
||||
address
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
server_infos.push(server_info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +175,7 @@ impl ConnectionPool {
|
||||
&mut self,
|
||||
shard: usize,
|
||||
role: Option<Role>,
|
||||
process_id: i32,
|
||||
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
||||
let now = Instant::now();
|
||||
let addresses = &self.addresses[shard];
|
||||
@@ -182,6 +211,8 @@ impl ConnectionPool {
|
||||
let index = self.round_robin % addresses.len();
|
||||
let address = &addresses[index];
|
||||
|
||||
self.stats.client_waiting(process_id, address.id);
|
||||
|
||||
// Make sure you're getting a primary or a replica
|
||||
// as per request. If no specific role is requested, the first
|
||||
// available will be chosen.
|
||||
@@ -201,6 +232,9 @@ impl ConnectionPool {
|
||||
Err(err) => {
|
||||
error!("Banning replica {}, error: {:?}", index, err);
|
||||
self.ban(address, shard);
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -209,7 +243,7 @@ impl ConnectionPool {
|
||||
let server = &mut *conn;
|
||||
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
||||
|
||||
self.stats.server_tested(server.process_id());
|
||||
self.stats.server_tested(server.process_id(), address.id);
|
||||
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||
@@ -220,8 +254,9 @@ impl ConnectionPool {
|
||||
// Check if health check succeeded
|
||||
Ok(res) => match res {
|
||||
Ok(_) => {
|
||||
self.stats.checkout_time(now.elapsed().as_micros());
|
||||
self.stats.server_idle(conn.process_id());
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_idle(conn.process_id(), address.id);
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
Err(_) => {
|
||||
@@ -230,6 +265,9 @@ impl ConnectionPool {
|
||||
server.mark_bad();
|
||||
|
||||
self.ban(address, shard);
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
@@ -240,6 +278,9 @@ impl ConnectionPool {
|
||||
server.mark_bad();
|
||||
|
||||
self.ban(address, shard);
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -324,6 +365,22 @@ impl ConnectionPool {
|
||||
pub fn servers(&self, shard: usize) -> usize {
|
||||
self.addresses[shard].len()
|
||||
}
|
||||
|
||||
pub fn databases(&self) -> usize {
|
||||
let mut databases = 0;
|
||||
for shard in 0..self.shards() {
|
||||
databases += self.servers(shard);
|
||||
}
|
||||
databases
|
||||
}
|
||||
|
||||
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
|
||||
self.databases[shard][server].state()
|
||||
}
|
||||
|
||||
pub fn address(&self, shard: usize, server: usize) -> &Address {
|
||||
&self.addresses[shard][server]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServerPool {
|
||||
@@ -361,13 +418,14 @@ impl ManageConnection for ServerPool {
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
info!(
|
||||
"Creating a new connection to {:?} using user {:?}",
|
||||
self.address, self.user.name
|
||||
self.address.name(),
|
||||
self.user.name
|
||||
);
|
||||
|
||||
// Put a temporary process_id into the stats
|
||||
// for server login.
|
||||
let process_id = rand::random::<i32>();
|
||||
self.stats.server_login(process_id);
|
||||
self.stats.server_login(process_id, self.address.id);
|
||||
|
||||
match Server::startup(
|
||||
&self.address,
|
||||
@@ -380,12 +438,12 @@ impl ManageConnection for ServerPool {
|
||||
{
|
||||
Ok(conn) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id);
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
Ok(conn)
|
||||
}
|
||||
Err(err) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id);
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use sqlparser::parser::Parser;
|
||||
|
||||
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||
r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$",
|
||||
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
||||
r"(?i)^ *SHOW SHARD *;? *$",
|
||||
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||
@@ -192,7 +192,10 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
Command::SetShard => {
|
||||
self.active_shard = Some(value.parse::<usize>().unwrap());
|
||||
self.active_shard = match value.to_ascii_uppercase().as_ref() {
|
||||
"ANY" => Some(rand::random::<usize>() % self.shards),
|
||||
_ => Some(value.parse::<usize>().unwrap()),
|
||||
};
|
||||
}
|
||||
|
||||
Command::SetServerRole => {
|
||||
|
||||
@@ -268,7 +268,8 @@ impl Server {
|
||||
|
||||
/// Send messages to the server from the client.
|
||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||
self.stats.data_sent(messages.len());
|
||||
self.stats
|
||||
.data_sent(messages.len(), self.process_id, self.address.id);
|
||||
|
||||
match write_all_half(&mut self.write, messages).await {
|
||||
Ok(_) => Ok(()),
|
||||
@@ -374,7 +375,8 @@ impl Server {
|
||||
let bytes = self.buffer.clone();
|
||||
|
||||
// Keep track of how much data we got from the server for stats.
|
||||
self.stats.data_received(bytes.len());
|
||||
self.stats
|
||||
.data_received(bytes.len(), self.process_id, self.address.id);
|
||||
|
||||
// Clear the buffer for next query.
|
||||
self.buffer.clear();
|
||||
@@ -470,7 +472,8 @@ impl Drop for Server {
|
||||
/// the socket is in non-blocking mode, so it may not be ready
|
||||
/// for a write.
|
||||
fn drop(&mut self) {
|
||||
self.stats.server_disconnecting(self.process_id());
|
||||
self.stats
|
||||
.server_disconnecting(self.process_id(), self.address.id);
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(4);
|
||||
bytes.put_u8(b'X');
|
||||
|
||||
308
src/stats.rs
308
src/stats.rs
@@ -1,12 +1,20 @@
|
||||
/// Statistics and reporting.
|
||||
use log::info;
|
||||
use statsd::Client;
|
||||
/// Events collector and publisher.
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
use std::collections::HashMap;
|
||||
// Latest stats updated every second; used in SHOW STATS and other admin commands.
|
||||
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
|
||||
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
use crate::config::get_config;
|
||||
// Statistics period used for average calculations.
|
||||
// 15 seconds.
|
||||
static STAT_PERIOD: u64 = 15000;
|
||||
|
||||
/// The names for the events reported
|
||||
/// to the statistics collector.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum EventName {
|
||||
CheckoutTime,
|
||||
@@ -23,201 +31,266 @@ enum EventName {
|
||||
ServerTested,
|
||||
ServerLogin,
|
||||
ServerDisconnecting,
|
||||
FlushStatsToStatsD,
|
||||
UpdateStats,
|
||||
UpdateAverages,
|
||||
}
|
||||
|
||||
/// Event data sent to the collector
|
||||
/// from clients and servers.
|
||||
#[derive(Debug)]
|
||||
pub struct Event {
|
||||
/// The name of the event being reported.
|
||||
name: EventName,
|
||||
|
||||
/// The value being reported. Meaning differs based on event name.
|
||||
value: i64,
|
||||
process_id: Option<i32>,
|
||||
|
||||
/// The client or server connection reporting the event.
|
||||
process_id: i32,
|
||||
|
||||
/// The server the client is connected to.
|
||||
address_id: usize,
|
||||
}
|
||||
|
||||
/// The statistics reporter. An instance is given
|
||||
/// to each possible source of statistics,
|
||||
/// e.g. clients, servers, connection pool.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Reporter {
|
||||
tx: Sender<Event>,
|
||||
}
|
||||
|
||||
impl Reporter {
|
||||
/// Create a new Reporter instance.
|
||||
pub fn new(tx: Sender<Event>) -> Reporter {
|
||||
Reporter { tx: tx }
|
||||
}
|
||||
|
||||
pub fn query(&self) {
|
||||
/// Report a query executed by a client against
|
||||
/// a server identified by the `address_id`.
|
||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::Query,
|
||||
value: 1,
|
||||
process_id: None,
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn transaction(&self) {
|
||||
/// Report a transaction executed by a client against
|
||||
/// a server identified by the `address_id`.
|
||||
pub fn transaction(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::Transaction,
|
||||
value: 1,
|
||||
process_id: None,
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn data_sent(&self, amount: usize) {
|
||||
/// Report data sent to a server identified by `address_id`.
|
||||
/// The `amount` is measured in bytes.
|
||||
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::DataSent,
|
||||
value: amount as i64,
|
||||
process_id: None,
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn data_received(&self, amount: usize) {
|
||||
/// Report data received from a server identified by `address_id`.
|
||||
/// The `amount` is measured in bytes.
|
||||
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::DataReceived,
|
||||
value: amount as i64,
|
||||
process_id: None,
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn checkout_time(&self, ms: u128) {
|
||||
/// Time spent waiting to get a healthy connection from the pool
|
||||
/// for a server identified by `address_id`.
|
||||
/// Measured in milliseconds.
|
||||
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::CheckoutTime,
|
||||
value: ms as i64,
|
||||
process_id: None,
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_waiting(&self, process_id: i32) {
|
||||
/// Reports a client identified by `process_id` waiting for a connection
|
||||
/// to a server identified by `address_id`.
|
||||
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ClientWaiting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_active(&self, process_id: i32) {
|
||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||
/// to a server identified by `address_id` and is about to query the server.
|
||||
pub fn client_active(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ClientActive,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_idle(&self, process_id: i32) {
|
||||
/// Reports a client identified by `process_id` is done querying the server
|
||||
/// identified by `address_id` and is no longer active.
|
||||
pub fn client_idle(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ClientIdle,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_disconnecting(&self, process_id: i32) {
|
||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||
/// The last server it was connected to is identified by `address_id`.
|
||||
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ClientDisconnecting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_active(&self, process_id: i32) {
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
/// a configured server identified by `address_id` is actively used
|
||||
/// by a client.
|
||||
pub fn server_active(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ServerActive,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_idle(&self, process_id: i32) {
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
/// a configured server identified by `address_id` is no longer
|
||||
/// actively used by a client and is now idle.
|
||||
pub fn server_idle(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ServerIdle,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_login(&self, process_id: i32) {
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
/// a configured server identified by `address_id` is attempting
|
||||
/// to login.
|
||||
pub fn server_login(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ServerLogin,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_tested(&self, process_id: i32) {
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
/// a configured server identified by `address_id` is being
|
||||
/// tested before being given to a client.
|
||||
pub fn server_tested(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ServerTested,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_disconnecting(&self, process_id: i32) {
|
||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||
/// The configured server it was connected to is identified by `address_id`.
|
||||
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
|
||||
let event = Event {
|
||||
name: EventName::ServerDisconnecting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
process_id: process_id,
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
// pub fn flush_to_statsd(&self) {
|
||||
// let event = Event {
|
||||
// name: EventName::FlushStatsToStatsD,
|
||||
// value: 0,
|
||||
// process_id: None,
|
||||
// };
|
||||
|
||||
// let _ = self.tx.try_send(event);
|
||||
// }
|
||||
}
|
||||
|
||||
/// The statistics collector which is receiving statistics
|
||||
/// from clients, servers, and the connection pool. There is
|
||||
/// only one collector (kind of like a singleton).
|
||||
/// The collector can trigger events on its own, e.g.
|
||||
/// it updates aggregates every second and averages every
|
||||
/// 15 seconds.
|
||||
pub struct Collector {
|
||||
rx: Receiver<Event>,
|
||||
tx: Sender<Event>,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Collector {
|
||||
/// Create a new collector instance. There should only be one instance
|
||||
/// at a time. This is ensured by mpsc which allows only one receiver.
|
||||
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
||||
Collector {
|
||||
rx,
|
||||
tx,
|
||||
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
||||
}
|
||||
Collector { rx, tx }
|
||||
}
|
||||
|
||||
pub async fn collect(&mut self) {
|
||||
/// The statistics collection handler. It will collect statistics
|
||||
/// for `address_id`s starting at 0 up to `addresses`.
|
||||
pub async fn collect(&mut self, addresses: usize) {
|
||||
info!("Events reporter started");
|
||||
|
||||
let mut stats = HashMap::from([
|
||||
let stats_template = HashMap::from([
|
||||
("total_query_count", 0),
|
||||
("total_xact_count", 0),
|
||||
("total_sent", 0),
|
||||
("total_received", 0),
|
||||
("total_xact_time", 0),
|
||||
("total_query_time", 0),
|
||||
("total_wait_time", 0),
|
||||
("avg_xact_time", 0),
|
||||
("avg_query_time", 0),
|
||||
("avg_xact_count", 0),
|
||||
("avg_sent", 0),
|
||||
("avg_received", 0),
|
||||
("avg_wait_time", 0),
|
||||
("maxwait_us", 0),
|
||||
("maxwait", 0),
|
||||
("cl_waiting", 0),
|
||||
@@ -229,21 +302,51 @@ impl Collector {
|
||||
("sv_tested", 0),
|
||||
]);
|
||||
|
||||
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
|
||||
let tx = self.tx.clone();
|
||||
let mut stats = HashMap::new();
|
||||
|
||||
// Stats saved after each iteration of the flush event. Used in calculation
|
||||
// of averages in the last flush period.
|
||||
let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
|
||||
|
||||
// Track which state the client and server are at any given time.
|
||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||
|
||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||
let tx = self.tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
|
||||
let mut interval =
|
||||
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::FlushStatsToStatsD,
|
||||
value: 0,
|
||||
process_id: None,
|
||||
});
|
||||
for address_id in 0..addresses {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateStats,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let tx = self.tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut interval =
|
||||
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
for address_id in 0..addresses {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateAverages,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// The collector loop
|
||||
loop {
|
||||
let stat = match self.rx.recv().await {
|
||||
Some(stat) => stat,
|
||||
@@ -253,6 +356,14 @@ impl Collector {
|
||||
}
|
||||
};
|
||||
|
||||
let stats = stats
|
||||
.entry(stat.address_id)
|
||||
.or_insert(stats_template.clone());
|
||||
let client_server_states = client_server_states
|
||||
.entry(stat.address_id)
|
||||
.or_insert(HashMap::new());
|
||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||
|
||||
// Some are counters, some are gauges...
|
||||
match stat.name {
|
||||
EventName::Query => {
|
||||
@@ -280,10 +391,11 @@ impl Collector {
|
||||
*counter += stat.value;
|
||||
|
||||
let counter = stats.entry("maxwait_us").or_insert(0);
|
||||
let mic_part = stat.value % 1_000_000;
|
||||
|
||||
// Report max time here
|
||||
if stat.value > *counter {
|
||||
*counter = stat.value;
|
||||
if mic_part > *counter {
|
||||
*counter = mic_part;
|
||||
}
|
||||
|
||||
let counter = stats.entry("maxwait").or_insert(0);
|
||||
@@ -301,15 +413,16 @@ impl Collector {
|
||||
| EventName::ServerIdle
|
||||
| EventName::ServerTested
|
||||
| EventName::ServerLogin => {
|
||||
client_server_states.insert(stat.process_id.unwrap(), stat.name);
|
||||
client_server_states.insert(stat.process_id, stat.name);
|
||||
}
|
||||
|
||||
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
||||
client_server_states.remove(&stat.process_id.unwrap());
|
||||
client_server_states.remove(&stat.process_id);
|
||||
}
|
||||
|
||||
EventName::FlushStatsToStatsD => {
|
||||
for (_, state) in &client_server_states {
|
||||
EventName::UpdateStats => {
|
||||
// Calculate connection states
|
||||
for (_, state) in client_server_states.iter() {
|
||||
match state {
|
||||
EventName::ClientActive => {
|
||||
let counter = stats.entry("cl_active").or_insert(0);
|
||||
@@ -321,11 +434,6 @@ impl Collector {
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientIdle => {
|
||||
let counter = stats.entry("cl_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerIdle => {
|
||||
let counter = stats.entry("sv_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
@@ -346,22 +454,64 @@ impl Collector {
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientIdle => {
|
||||
let counter = stats.entry("cl_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
_ => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
info!("{:?}", stats);
|
||||
|
||||
let mut pipeline = self.client.pipeline();
|
||||
|
||||
for (key, value) in stats.iter_mut() {
|
||||
pipeline.gauge(key, *value as f64);
|
||||
*value = 0;
|
||||
// Update latest stats used in SHOW STATS
|
||||
let mut guard = LATEST_STATS.lock();
|
||||
for (key, value) in stats.iter() {
|
||||
let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
|
||||
entry.insert(key.to_string(), value.clone());
|
||||
}
|
||||
|
||||
pipeline.send(&self.client);
|
||||
// These are re-calculated every iteration of the loop, so we don't want to add values
|
||||
// from the last iteration.
|
||||
for stat in &[
|
||||
"cl_active",
|
||||
"cl_waiting",
|
||||
"cl_idle",
|
||||
"sv_idle",
|
||||
"sv_active",
|
||||
"sv_tested",
|
||||
"sv_login",
|
||||
"maxwait",
|
||||
"maxwait_us",
|
||||
] {
|
||||
stats.insert(stat, 0);
|
||||
}
|
||||
}
|
||||
|
||||
EventName::UpdateAverages => {
|
||||
// Calculate averages
|
||||
for stat in &[
|
||||
"avg_query_count",
|
||||
"avgxact_count",
|
||||
"avg_sent",
|
||||
"avg_received",
|
||||
"avg_wait_time",
|
||||
] {
|
||||
let total_name = stat.replace("avg_", "total_");
|
||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||
|
||||
stats.insert(stat, avg);
|
||||
*old_value = new_value;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a snapshot of statistics. Updated once a second
|
||||
/// by the `Collector`.
|
||||
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
|
||||
LATEST_STATS.lock().clone()
|
||||
}
|
||||
|
||||
28
tests/pgbench/simple.sql
Normal file
28
tests/pgbench/simple.sql
Normal file
@@ -0,0 +1,28 @@
|
||||
|
||||
-- \setrandom aid 1 :naccounts
|
||||
\set aid random(1, 100000)
|
||||
-- \setrandom bid 1 :nbranches
|
||||
\set bid random(1, 100000)
|
||||
-- \setrandom tid 1 :ntellers
|
||||
\set tid random(1, 100000)
|
||||
-- \setrandom delta -5000 5000
|
||||
\set delta random(-5000,5000)
|
||||
|
||||
\set shard random(0, 2)
|
||||
|
||||
SET SHARD TO :shard;
|
||||
|
||||
BEGIN;
|
||||
|
||||
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
|
||||
|
||||
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
|
||||
|
||||
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
|
||||
|
||||
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
|
||||
|
||||
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
|
||||
|
||||
END;
|
||||
|
||||
Reference in New Issue
Block a user