mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-24 01:36:29 +00:00
Compare commits
12 Commits
query-rout
...
v0.1.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
341ebf4123 | ||
|
|
35828a0a8c | ||
|
|
1e8fa110ae | ||
|
|
d4186b7815 | ||
|
|
aaeef69d59 | ||
|
|
b21e0f4a7e | ||
|
|
eb1473060e | ||
|
|
26f75f8d5d | ||
|
|
99d65fc475 | ||
|
|
206fdc9769 | ||
|
|
f74101cdfe | ||
|
|
8e0682482d |
@@ -13,6 +13,9 @@ function start_pgcat() {
|
|||||||
|
|
||||||
# Setup the database with shards and user
|
# Setup the database with shards and user
|
||||||
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||||
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||||
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
|
|
||||||
# Install Toxiproxy to simulate a downed/slow database
|
# Install Toxiproxy to simulate a downed/slow database
|
||||||
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
||||||
@@ -28,9 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
|
|||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
# pgbench test
|
# pgbench test
|
||||||
pgbench -i -h 127.0.0.1 -p 6432 && \
|
pgbench -i -h 127.0.0.1 -p 6432
|
||||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \
|
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
|
||||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||||
|
|
||||||
# COPY TO STDOUT test
|
# COPY TO STDOUT test
|
||||||
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
||||||
@@ -57,6 +60,17 @@ cd tests/ruby && \
|
|||||||
ruby tests.rb && \
|
ruby tests.rb && \
|
||||||
cd ../..
|
cd ../..
|
||||||
|
|
||||||
|
# Admin tests
|
||||||
|
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||||
|
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||||
|
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||||
|
|
||||||
# Start PgCat in debug to demonstrate failover better
|
# Start PgCat in debug to demonstrate failover better
|
||||||
start_pgcat "debug"
|
start_pgcat "debug"
|
||||||
|
|
||||||
|
|||||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -371,7 +371,6 @@ dependencies = [
|
|||||||
"serde_derive",
|
"serde_derive",
|
||||||
"sha-1",
|
"sha-1",
|
||||||
"sqlparser",
|
"sqlparser",
|
||||||
"statsd",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
]
|
]
|
||||||
@@ -542,15 +541,6 @@ dependencies = [
|
|||||||
"log",
|
"log",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "statsd"
|
|
||||||
version = "0.15.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330"
|
|
||||||
dependencies = [
|
|
||||||
"rand",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.86"
|
version = "1.0.86"
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
statsd = "0.15"
|
|
||||||
sqlparser = "0.14"
|
sqlparser = "0.14"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
|
|||||||
25
README.md
25
README.md
@@ -133,6 +133,31 @@ All servers are checked with a `SELECT 1` query before being given to a client.
|
|||||||
|
|
||||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||||
|
|
||||||
|
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
|
||||||
|
|
||||||
|
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|
||||||
|
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
|
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the round-robin loop. |
|
||||||
|
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||||
|
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the round-robin loop. |
|
||||||
|
| Read query | replica | false | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||||
|
| Read query | primary | false | false | up | Query is routed to the primary. |
|
||||||
|
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the round-robin loop is attempted. |
|
||||||
|
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||||
|
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the round-robin loop. |
|
||||||
|
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||||
|
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
|
||||||
|
| | | | | | |
|
||||||
|
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the round-robin loop. If the instance is a replica, the query fails and the client receives an error. |
|
||||||
|
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
|
||||||
|
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
|
||||||
|
| Write query | primary | false | false | up | The query is routed to the primary. |
|
||||||
|
| Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
|
||||||
|
| Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||||
|
| Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
|
||||||
|
| Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||||
|
| | | | | | |
|
||||||
|
|
||||||
### Sharding
|
### Sharding
|
||||||
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
|
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
|
||||||
|
|
||||||
|
|||||||
386
src/admin.rs
Normal file
386
src/admin.rs
Normal file
@@ -0,0 +1,386 @@
|
|||||||
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
|
use log::{info, trace};
|
||||||
|
use tokio::net::tcp::OwnedWriteHalf;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::config::{get_config, parse};
|
||||||
|
use crate::errors::Error;
|
||||||
|
use crate::messages::*;
|
||||||
|
use crate::pool::ConnectionPool;
|
||||||
|
use crate::stats::get_stats;
|
||||||
|
|
||||||
|
/// Handle admin client
|
||||||
|
pub async fn handle_admin(
|
||||||
|
stream: &mut OwnedWriteHalf,
|
||||||
|
mut query: BytesMut,
|
||||||
|
pool: ConnectionPool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let code = query.get_u8() as char;
|
||||||
|
|
||||||
|
if code != 'Q' {
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = query.get_i32() as usize;
|
||||||
|
let query = String::from_utf8_lossy(&query[..len - 5])
|
||||||
|
.to_string()
|
||||||
|
.to_ascii_uppercase();
|
||||||
|
|
||||||
|
trace!("Admin query: {}", query);
|
||||||
|
|
||||||
|
if query.starts_with("SHOW STATS") {
|
||||||
|
trace!("SHOW STATS");
|
||||||
|
show_stats(stream, &pool).await
|
||||||
|
} else if query.starts_with("RELOAD") {
|
||||||
|
trace!("RELOAD");
|
||||||
|
reload(stream).await
|
||||||
|
} else if query.starts_with("SHOW CONFIG") {
|
||||||
|
trace!("SHOW CONFIG");
|
||||||
|
show_config(stream).await
|
||||||
|
} else if query.starts_with("SHOW DATABASES") {
|
||||||
|
trace!("SHOW DATABASES");
|
||||||
|
show_databases(stream, &pool).await
|
||||||
|
} else if query.starts_with("SHOW POOLS") {
|
||||||
|
trace!("SHOW POOLS");
|
||||||
|
show_pools(stream, &pool).await
|
||||||
|
} else if query.starts_with("SHOW LISTS") {
|
||||||
|
trace!("SHOW LISTS");
|
||||||
|
show_lists(stream, &pool).await
|
||||||
|
} else if query.starts_with("SHOW VERSION") {
|
||||||
|
trace!("SHOW VERSION");
|
||||||
|
show_version(stream).await
|
||||||
|
} else if query.starts_with("SET ") {
|
||||||
|
trace!("SET");
|
||||||
|
ignore_set(stream).await
|
||||||
|
} else {
|
||||||
|
error_response(stream, "Unsupported query against the admin database").await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SHOW LISTS
|
||||||
|
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
|
let stats = get_stats();
|
||||||
|
|
||||||
|
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||||
|
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put(row_description(&columns));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"databases".to_string(),
|
||||||
|
(pool.databases() + 1).to_string(), // see comment below
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec!["users".to_string(), "1".to_string()]));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"pools".to_string(),
|
||||||
|
(pool.databases() + 1).to_string(), // +1 for the pgbouncer admin db pool which isn't real
|
||||||
|
])); // but admin tools that work with pgbouncer want this
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"free_clients".to_string(),
|
||||||
|
stats
|
||||||
|
.keys()
|
||||||
|
.map(|address_id| stats[&address_id]["cl_idle"])
|
||||||
|
.sum::<i64>()
|
||||||
|
.to_string(),
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"used_clients".to_string(),
|
||||||
|
stats
|
||||||
|
.keys()
|
||||||
|
.map(|address_id| stats[&address_id]["cl_active"])
|
||||||
|
.sum::<i64>()
|
||||||
|
.to_string(),
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"login_clients".to_string(),
|
||||||
|
"0".to_string(),
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"free_servers".to_string(),
|
||||||
|
stats
|
||||||
|
.keys()
|
||||||
|
.map(|address_id| stats[&address_id]["sv_idle"])
|
||||||
|
.sum::<i64>()
|
||||||
|
.to_string(),
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
"used_servers".to_string(),
|
||||||
|
stats
|
||||||
|
.keys()
|
||||||
|
.map(|address_id| stats[&address_id]["sv_active"])
|
||||||
|
.sum::<i64>()
|
||||||
|
.to_string(),
|
||||||
|
]));
|
||||||
|
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||||
|
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
|
||||||
|
res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()]));
|
||||||
|
res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()]));
|
||||||
|
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SHOW VERSION
|
||||||
|
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
|
||||||
|
res.put(row_description(&vec![("version", DataType::Text)]));
|
||||||
|
res.put(data_row(&vec!["PgCat 0.1.0".to_string()]));
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SHOW POOLS
|
||||||
|
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
|
let stats = get_stats();
|
||||||
|
let config = {
|
||||||
|
let guard = get_config();
|
||||||
|
&*guard.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let columns = vec![
|
||||||
|
("database", DataType::Text),
|
||||||
|
("user", DataType::Text),
|
||||||
|
("cl_active", DataType::Numeric),
|
||||||
|
("cl_waiting", DataType::Numeric),
|
||||||
|
("cl_cancel_req", DataType::Numeric),
|
||||||
|
("sv_active", DataType::Numeric),
|
||||||
|
("sv_idle", DataType::Numeric),
|
||||||
|
("sv_used", DataType::Numeric),
|
||||||
|
("sv_tested", DataType::Numeric),
|
||||||
|
("sv_login", DataType::Numeric),
|
||||||
|
("maxwait", DataType::Numeric),
|
||||||
|
("maxwait_us", DataType::Numeric),
|
||||||
|
("pool_mode", DataType::Text),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
|
for shard in 0..pool.shards() {
|
||||||
|
for server in 0..pool.servers(shard) {
|
||||||
|
let address = pool.address(shard, server);
|
||||||
|
let stats = match stats.get(&address.id) {
|
||||||
|
Some(stats) => stats.clone(),
|
||||||
|
None => HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut row = vec![address.name(), config.user.name.clone()];
|
||||||
|
|
||||||
|
for column in &columns[2..columns.len() - 1] {
|
||||||
|
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||||
|
row.push(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
row.push(config.general.pool_mode.to_string());
|
||||||
|
res.put(data_row(&row));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SHOW DATABASES
|
||||||
|
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
|
let guard = get_config();
|
||||||
|
let config = &*guard.clone();
|
||||||
|
drop(guard);
|
||||||
|
|
||||||
|
// Columns
|
||||||
|
let columns = vec![
|
||||||
|
("name", DataType::Text),
|
||||||
|
("host", DataType::Text),
|
||||||
|
("port", DataType::Text),
|
||||||
|
("database", DataType::Text),
|
||||||
|
("force_user", DataType::Text),
|
||||||
|
("pool_size", DataType::Int4),
|
||||||
|
("min_pool_size", DataType::Int4),
|
||||||
|
("reserve_pool", DataType::Int4),
|
||||||
|
("pool_mode", DataType::Text),
|
||||||
|
("max_connections", DataType::Int4),
|
||||||
|
("current_connections", DataType::Int4),
|
||||||
|
("paused", DataType::Int4),
|
||||||
|
("disabled", DataType::Int4),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
|
||||||
|
// RowDescription
|
||||||
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
|
for shard in 0..pool.shards() {
|
||||||
|
let database_name = &config.shards[&shard.to_string()].database;
|
||||||
|
|
||||||
|
for server in 0..pool.servers(shard) {
|
||||||
|
let address = pool.address(shard, server);
|
||||||
|
let pool_state = pool.pool_state(shard, server);
|
||||||
|
|
||||||
|
res.put(data_row(&vec![
|
||||||
|
address.name(), // name
|
||||||
|
address.host.to_string(), // host
|
||||||
|
address.port.to_string(), // port
|
||||||
|
database_name.to_string(), // database
|
||||||
|
config.user.name.to_string(), // force_user
|
||||||
|
config.general.pool_size.to_string(), // pool_size
|
||||||
|
"0".to_string(), // min_pool_size
|
||||||
|
"0".to_string(), // reserve_pool
|
||||||
|
config.general.pool_mode.to_string(), // pool_mode
|
||||||
|
config.general.pool_size.to_string(), // max_connections
|
||||||
|
pool_state.connections.to_string(), // current_connections
|
||||||
|
"0".to_string(), // paused
|
||||||
|
"0".to_string(), // disabled
|
||||||
|
]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
// ReadyForQuery
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ignore any SET commands the client sends.
|
||||||
|
/// This is common initialization done by ORMs.
|
||||||
|
async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
|
custom_protocol_response_ok(stream, "SET").await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RELOAD
|
||||||
|
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
|
info!("Reloading config");
|
||||||
|
|
||||||
|
let config = get_config();
|
||||||
|
let path = config.path.clone().unwrap();
|
||||||
|
|
||||||
|
parse(&path).await?;
|
||||||
|
|
||||||
|
let config = get_config();
|
||||||
|
|
||||||
|
config.show();
|
||||||
|
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
|
||||||
|
// CommandComplete
|
||||||
|
res.put(command_complete("RELOAD"));
|
||||||
|
|
||||||
|
// ReadyForQuery
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
|
let guard = get_config();
|
||||||
|
let config = &*guard.clone();
|
||||||
|
let config: HashMap<String, String> = config.into();
|
||||||
|
drop(guard);
|
||||||
|
|
||||||
|
// Configs that cannot be changed dynamically.
|
||||||
|
let immutables = ["host", "port", "connect_timeout"];
|
||||||
|
|
||||||
|
// Columns
|
||||||
|
let columns = vec![
|
||||||
|
("key", DataType::Text),
|
||||||
|
("value", DataType::Text),
|
||||||
|
("default", DataType::Text),
|
||||||
|
("changeable", DataType::Text),
|
||||||
|
];
|
||||||
|
|
||||||
|
// Response data
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
|
// DataRow rows
|
||||||
|
for (key, value) in config {
|
||||||
|
let changeable = if immutables.iter().filter(|col| *col == &key).count() == 1 {
|
||||||
|
"no".to_string()
|
||||||
|
} else {
|
||||||
|
"yes".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
let row = vec![key, value, "-".to_string(), changeable];
|
||||||
|
|
||||||
|
res.put(data_row(&row));
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SHOW STATS
|
||||||
|
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
|
let columns = vec![
|
||||||
|
("database", DataType::Text),
|
||||||
|
("total_xact_count", DataType::Numeric),
|
||||||
|
("total_query_count", DataType::Numeric),
|
||||||
|
("total_received", DataType::Numeric),
|
||||||
|
("total_sent", DataType::Numeric),
|
||||||
|
("total_xact_time", DataType::Numeric),
|
||||||
|
("total_query_time", DataType::Numeric),
|
||||||
|
("total_wait_time", DataType::Numeric),
|
||||||
|
("avg_xact_count", DataType::Numeric),
|
||||||
|
("avg_query_count", DataType::Numeric),
|
||||||
|
("avg_recv", DataType::Numeric),
|
||||||
|
("avg_sent", DataType::Numeric),
|
||||||
|
("avg_xact_time", DataType::Numeric),
|
||||||
|
("avg_query_time", DataType::Numeric),
|
||||||
|
("avg_wait_time", DataType::Numeric),
|
||||||
|
];
|
||||||
|
|
||||||
|
let stats = get_stats();
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
|
for shard in 0..pool.shards() {
|
||||||
|
for server in 0..pool.servers(shard) {
|
||||||
|
let address = pool.address(shard, server);
|
||||||
|
let stats = match stats.get(&address.id) {
|
||||||
|
Some(stats) => stats.clone(),
|
||||||
|
None => HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut row = vec![address.name()];
|
||||||
|
|
||||||
|
for column in &columns[1..] {
|
||||||
|
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put(data_row(&row));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ use tokio::net::{
|
|||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::admin::handle_admin;
|
||||||
use crate::config::get_config;
|
use crate::config::get_config;
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
@@ -54,6 +55,15 @@ pub struct Client {
|
|||||||
|
|
||||||
// Statistics
|
// Statistics
|
||||||
stats: Reporter,
|
stats: Reporter,
|
||||||
|
|
||||||
|
// Clients want to talk to admin
|
||||||
|
admin: bool,
|
||||||
|
|
||||||
|
// Last address the client talked to
|
||||||
|
last_address_id: Option<usize>,
|
||||||
|
|
||||||
|
// Last server process id we talked to
|
||||||
|
last_server_id: Option<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
@@ -118,6 +128,15 @@ impl Client {
|
|||||||
ready_for_query(&mut stream).await?;
|
ready_for_query(&mut stream).await?;
|
||||||
trace!("Startup OK");
|
trace!("Startup OK");
|
||||||
|
|
||||||
|
let database = parameters
|
||||||
|
.get("database")
|
||||||
|
.unwrap_or(parameters.get("user").unwrap());
|
||||||
|
let admin = ["pgcat", "pgbouncer"]
|
||||||
|
.iter()
|
||||||
|
.filter(|db| *db == &database)
|
||||||
|
.count()
|
||||||
|
== 1;
|
||||||
|
|
||||||
// Split the read and write streams
|
// Split the read and write streams
|
||||||
// so we can control buffering.
|
// so we can control buffering.
|
||||||
let (read, write) = stream.into_split();
|
let (read, write) = stream.into_split();
|
||||||
@@ -133,6 +152,9 @@ impl Client {
|
|||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
parameters: parameters,
|
parameters: parameters,
|
||||||
stats: stats,
|
stats: stats,
|
||||||
|
admin: admin,
|
||||||
|
last_address_id: None,
|
||||||
|
last_server_id: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,6 +176,9 @@ impl Client {
|
|||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
parameters: HashMap::new(),
|
parameters: HashMap::new(),
|
||||||
stats: stats,
|
stats: stats,
|
||||||
|
admin: false,
|
||||||
|
last_address_id: None,
|
||||||
|
last_server_id: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -204,9 +229,6 @@ impl Client {
|
|||||||
loop {
|
loop {
|
||||||
trace!("Client idle, waiting for message");
|
trace!("Client idle, waiting for message");
|
||||||
|
|
||||||
// Client idle, waiting for messages.
|
|
||||||
self.stats.client_idle(self.process_id);
|
|
||||||
|
|
||||||
// Read a complete message from the client, which normally would be
|
// Read a complete message from the client, which normally would be
|
||||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||||
// We can parse it here before grabbing a server from the pool,
|
// We can parse it here before grabbing a server from the pool,
|
||||||
@@ -220,6 +242,13 @@ impl Client {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle admin database real quick
|
||||||
|
if self.admin {
|
||||||
|
trace!("Handling admin command");
|
||||||
|
handle_admin(&mut self.write, message, pool.clone()).await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Handle all custom protocol commands here.
|
// Handle all custom protocol commands here.
|
||||||
match query_router.try_execute_command(message.clone()) {
|
match query_router.try_execute_command(message.clone()) {
|
||||||
// Normal query
|
// Normal query
|
||||||
@@ -270,13 +299,13 @@ impl Client {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waiting for server connection.
|
|
||||||
self.stats.client_waiting(self.process_id);
|
|
||||||
|
|
||||||
debug!("Waiting for connection from pool");
|
debug!("Waiting for connection from pool");
|
||||||
|
|
||||||
// Grab a server from the pool: the client issued a regular query.
|
// Grab a server from the pool: the client issued a regular query.
|
||||||
let connection = match pool.get(query_router.shard(), query_router.role()).await {
|
let connection = match pool
|
||||||
|
.get(query_router.shard(), query_router.role(), self.process_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
debug!("Got connection from pool");
|
debug!("Got connection from pool");
|
||||||
conn
|
conn
|
||||||
@@ -290,15 +319,23 @@ impl Client {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut reference = connection.0;
|
let mut reference = connection.0;
|
||||||
let _address = connection.1;
|
let address = connection.1;
|
||||||
let server = &mut *reference;
|
let server = &mut *reference;
|
||||||
|
|
||||||
// Claim this server as mine for query cancellation.
|
// Claim this server as mine for query cancellation.
|
||||||
server.claim(self.process_id, self.secret_key);
|
server.claim(self.process_id, self.secret_key);
|
||||||
|
|
||||||
|
// "disconnect" from the previous server stats-wise
|
||||||
|
if let Some(last_address_id) = self.last_address_id {
|
||||||
|
self.stats
|
||||||
|
.client_disconnecting(self.process_id, last_address_id);
|
||||||
|
}
|
||||||
|
|
||||||
// Client active & server active
|
// Client active & server active
|
||||||
self.stats.client_active(self.process_id);
|
self.stats.client_active(self.process_id, address.id);
|
||||||
self.stats.server_active(server.process_id());
|
self.stats.server_active(server.process_id(), address.id);
|
||||||
|
self.last_address_id = Some(address.id);
|
||||||
|
self.last_server_id = Some(server.process_id());
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Client {:?} talking to server {:?}",
|
"Client {:?} talking to server {:?}",
|
||||||
@@ -370,17 +407,17 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Report query executed statistics.
|
// Report query executed statistics.
|
||||||
self.stats.query();
|
self.stats.query(self.process_id, address.id);
|
||||||
|
|
||||||
// The transaction is over, we can release the connection back to the pool.
|
// The transaction is over, we can release the connection back to the pool.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
// Report transaction executed statistics.
|
// Report transaction executed statistics.
|
||||||
self.stats.transaction();
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id());
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -456,15 +493,15 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Report query executed statistics.
|
// Report query executed statistics.
|
||||||
self.stats.query();
|
self.stats.query(self.process_id, address.id);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction();
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id());
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -495,10 +532,10 @@ impl Client {
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction();
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id());
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -515,6 +552,7 @@ impl Client {
|
|||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
self.release();
|
self.release();
|
||||||
|
self.stats.client_idle(self.process_id, address.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -527,6 +565,14 @@ impl Client {
|
|||||||
|
|
||||||
impl Drop for Client {
|
impl Drop for Client {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.stats.client_disconnecting(self.process_id);
|
// Disconnect the client
|
||||||
|
if let Some(address_id) = self.last_address_id {
|
||||||
|
self.stats.client_disconnecting(self.process_id, address_id);
|
||||||
|
|
||||||
|
// The server is now idle
|
||||||
|
if let Some(process_id) = self.last_server_id {
|
||||||
|
self.stats.server_idle(process_id, address_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,15 @@ pub enum Role {
|
|||||||
Replica,
|
Replica,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ToString for Role {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
match *self {
|
||||||
|
Role::Primary => "primary".to_string(),
|
||||||
|
Role::Replica => "replica".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PartialEq<Option<Role>> for Role {
|
impl PartialEq<Option<Role>> for Role {
|
||||||
fn eq(&self, other: &Option<Role>) -> bool {
|
fn eq(&self, other: &Option<Role>) -> bool {
|
||||||
match other {
|
match other {
|
||||||
@@ -39,23 +48,37 @@ impl PartialEq<Role> for Option<Role> {
|
|||||||
|
|
||||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
||||||
pub struct Address {
|
pub struct Address {
|
||||||
|
pub id: usize,
|
||||||
pub host: String,
|
pub host: String,
|
||||||
pub port: String,
|
pub port: String,
|
||||||
pub shard: usize,
|
pub shard: usize,
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
|
pub replica_number: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Address {
|
impl Default for Address {
|
||||||
fn default() -> Address {
|
fn default() -> Address {
|
||||||
Address {
|
Address {
|
||||||
|
id: 0,
|
||||||
host: String::from("127.0.0.1"),
|
host: String::from("127.0.0.1"),
|
||||||
port: String::from("5432"),
|
port: String::from("5432"),
|
||||||
shard: 0,
|
shard: 0,
|
||||||
|
replica_number: 0,
|
||||||
role: Role::Replica,
|
role: Role::Replica,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Address {
|
||||||
|
pub fn name(&self) -> String {
|
||||||
|
match self.role {
|
||||||
|
Role::Primary => format!("shard_{}_primary", self.shard),
|
||||||
|
|
||||||
|
Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
|
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
@@ -134,6 +157,7 @@ impl Default for QueryRouter {
|
|||||||
|
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
pub path: Option<String>,
|
||||||
pub general: General,
|
pub general: General,
|
||||||
pub user: User,
|
pub user: User,
|
||||||
pub shards: HashMap<String, Shard>,
|
pub shards: HashMap<String, Shard>,
|
||||||
@@ -143,6 +167,7 @@ pub struct Config {
|
|||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Config {
|
fn default() -> Config {
|
||||||
Config {
|
Config {
|
||||||
|
path: Some(String::from("pgcat.toml")),
|
||||||
general: General::default(),
|
general: General::default(),
|
||||||
user: User::default(),
|
user: User::default(),
|
||||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||||
@@ -151,6 +176,52 @@ impl Default for Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<&Config> for std::collections::HashMap<String, String> {
|
||||||
|
fn from(config: &Config) -> HashMap<String, String> {
|
||||||
|
HashMap::from([
|
||||||
|
("host".to_string(), config.general.host.to_string()),
|
||||||
|
("port".to_string(), config.general.port.to_string()),
|
||||||
|
(
|
||||||
|
"pool_size".to_string(),
|
||||||
|
config.general.pool_size.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"pool_mode".to_string(),
|
||||||
|
config.general.pool_mode.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"connect_timeout".to_string(),
|
||||||
|
config.general.connect_timeout.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"healthcheck_timeout".to_string(),
|
||||||
|
config.general.healthcheck_timeout.to_string(),
|
||||||
|
),
|
||||||
|
("ban_time".to_string(), config.general.ban_time.to_string()),
|
||||||
|
(
|
||||||
|
"statsd_address".to_string(),
|
||||||
|
config.general.statsd_address.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"default_role".to_string(),
|
||||||
|
config.query_router.default_role.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"query_parser_enabled".to_string(),
|
||||||
|
config.query_router.query_parser_enabled.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"primary_reads_enabled".to_string(),
|
||||||
|
config.query_router.primary_reads_enabled.to_string(),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"sharding_function".to_string(),
|
||||||
|
config.query_router.sharding_function.to_string(),
|
||||||
|
),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub fn show(&self) {
|
pub fn show(&self) {
|
||||||
info!("Pool size: {}", self.general.pool_size);
|
info!("Pool size: {}", self.general.pool_size);
|
||||||
@@ -189,7 +260,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let config: Config = match toml::from_str(&contents) {
|
let mut config: Config = match toml::from_str(&contents) {
|
||||||
Ok(config) => config,
|
Ok(config) => config,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Could not parse config file: {}", err.to_string());
|
error!("Could not parse config file: {}", err.to_string());
|
||||||
@@ -279,6 +350,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
config.path = Some(path.to_string());
|
||||||
|
|
||||||
CONFIG.store(Arc::new(config.clone()));
|
CONFIG.store(Arc::new(config.clone()));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -296,5 +369,6 @@ mod test {
|
|||||||
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
|
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
|
||||||
assert_eq!(get_config().shards["0"].servers[0].2, "primary");
|
assert_eq!(get_config().shards["0"].servers[0].2, "primary");
|
||||||
assert_eq!(get_config().query_router.default_role, "any");
|
assert_eq!(get_config().query_router.default_role, "any");
|
||||||
|
assert_eq!(get_config().path, Some("pgcat.toml".to_string()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,3 +20,8 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
|
|||||||
|
|
||||||
// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
|
// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
|
||||||
pub const MESSAGE_TERMINATOR: u8 = 0;
|
pub const MESSAGE_TERMINATOR: u8 = 0;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Data types
|
||||||
|
//
|
||||||
|
pub const _OID_INT8: i32 = 20; // bigint
|
||||||
|
|||||||
16
src/main.rs
16
src/main.rs
@@ -31,7 +31,6 @@ extern crate once_cell;
|
|||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_derive;
|
extern crate serde_derive;
|
||||||
extern crate sqlparser;
|
extern crate sqlparser;
|
||||||
extern crate statsd;
|
|
||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
extern crate toml;
|
extern crate toml;
|
||||||
|
|
||||||
@@ -47,6 +46,7 @@ use tokio::{
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
mod admin;
|
||||||
mod client;
|
mod client;
|
||||||
mod config;
|
mod config;
|
||||||
mod constants;
|
mod constants;
|
||||||
@@ -112,15 +112,19 @@ async fn main() {
|
|||||||
|
|
||||||
// Collect statistics and send them to StatsD
|
// Collect statistics and send them to StatsD
|
||||||
let (tx, rx) = mpsc::channel(100);
|
let (tx, rx) = mpsc::channel(100);
|
||||||
let collector_tx = tx.clone();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
|
||||||
stats_collector.collect().await;
|
|
||||||
});
|
|
||||||
|
|
||||||
|
// Connection pool for all shards and replicas
|
||||||
let mut pool =
|
let mut pool =
|
||||||
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
|
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
|
||||||
|
|
||||||
|
let collector_tx = tx.clone();
|
||||||
|
let addresses = pool.databases();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||||
|
stats_collector.collect(addresses).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Connect to all servers and validate their versions.
|
||||||
let server_info = match pool.validate().await {
|
let server_info = match pool.validate().await {
|
||||||
Ok(info) => info,
|
Ok(info) => info,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|||||||
162
src/messages.rs
162
src/messages.rs
@@ -8,9 +8,26 @@ use tokio::net::{
|
|||||||
TcpStream,
|
TcpStream,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::errors::Error;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::errors::Error;
|
/// Postgres data type mappings
|
||||||
|
/// used in RowDescription ('T') message.
|
||||||
|
pub enum DataType {
|
||||||
|
Text,
|
||||||
|
Int4,
|
||||||
|
Numeric,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&DataType> for i32 {
|
||||||
|
fn from(data_type: &DataType) -> i32 {
|
||||||
|
match data_type {
|
||||||
|
DataType::Text => 25,
|
||||||
|
DataType::Int4 => 23,
|
||||||
|
DataType::Numeric => 1700,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Tell the client that authentication handshake completed successfully.
|
/// Tell the client that authentication handshake completed successfully.
|
||||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||||
@@ -91,9 +108,8 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse StartupMessage parameters.
|
/// Parse the params the server sends as a key/value format.
|
||||||
/// e.g. user, database, application_name, etc.
|
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||||
pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
|
||||||
let mut result = HashMap::new();
|
let mut result = HashMap::new();
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
let mut tmp = String::new();
|
let mut tmp = String::new();
|
||||||
@@ -115,7 +131,7 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
|||||||
|
|
||||||
// Expect pairs of name and value
|
// Expect pairs of name and value
|
||||||
// and at least one pair to be present.
|
// and at least one pair to be present.
|
||||||
if buf.len() % 2 != 0 && buf.len() >= 2 {
|
if buf.len() % 2 != 0 || buf.len() < 2 {
|
||||||
return Err(Error::ClientBadStartup);
|
return Err(Error::ClientBadStartup);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,6 +143,14 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
|||||||
i += 2;
|
i += 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse StartupMessage parameters.
|
||||||
|
/// e.g. user, database, application_name, etc.
|
||||||
|
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||||
|
let result = parse_params(bytes)?;
|
||||||
|
|
||||||
// Minimum required parameters
|
// Minimum required parameters
|
||||||
// I want to have the user at the very minimum, according to the protocol spec.
|
// I want to have the user at the very minimum, according to the protocol spec.
|
||||||
if !result.contains_key("user") {
|
if !result.contains_key("user") {
|
||||||
@@ -252,68 +276,17 @@ pub async fn show_response(
|
|||||||
// 3. CommandComplete
|
// 3. CommandComplete
|
||||||
// 4. ReadyForQuery
|
// 4. ReadyForQuery
|
||||||
|
|
||||||
// RowDescription
|
|
||||||
let mut row_desc = BytesMut::new();
|
|
||||||
|
|
||||||
// Number of columns: 1
|
|
||||||
row_desc.put_i16(1);
|
|
||||||
|
|
||||||
// Column name
|
|
||||||
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
|
||||||
|
|
||||||
// Doesn't belong to any table
|
|
||||||
row_desc.put_i32(0);
|
|
||||||
|
|
||||||
// Doesn't belong to any table
|
|
||||||
row_desc.put_i16(0);
|
|
||||||
|
|
||||||
// Text
|
|
||||||
row_desc.put_i32(25);
|
|
||||||
|
|
||||||
// Text size = variable (-1)
|
|
||||||
row_desc.put_i16(-1);
|
|
||||||
|
|
||||||
// Type modifier: none that I know
|
|
||||||
row_desc.put_i32(0);
|
|
||||||
|
|
||||||
// Format being used: text (0), binary (1)
|
|
||||||
row_desc.put_i16(0);
|
|
||||||
|
|
||||||
// DataRow
|
|
||||||
let mut data_row = BytesMut::new();
|
|
||||||
|
|
||||||
// Number of columns
|
|
||||||
data_row.put_i16(1);
|
|
||||||
|
|
||||||
// Size of the column content (length of the string really)
|
|
||||||
data_row.put_i32(value.len() as i32);
|
|
||||||
|
|
||||||
// The content
|
|
||||||
data_row.put_slice(value.as_bytes());
|
|
||||||
|
|
||||||
// CommandComplete
|
|
||||||
let mut command_complete = BytesMut::new();
|
|
||||||
|
|
||||||
// Number of rows returned (just one)
|
|
||||||
command_complete.put_slice(&b"SELECT 1\0"[..]);
|
|
||||||
|
|
||||||
// The final messages sent to the client
|
// The final messages sent to the client
|
||||||
let mut res = BytesMut::new();
|
let mut res = BytesMut::new();
|
||||||
|
|
||||||
// RowDescription
|
// RowDescription
|
||||||
res.put_u8(b'T');
|
res.put(row_description(&vec![(name, DataType::Text)]));
|
||||||
res.put_i32(row_desc.len() as i32 + 4);
|
|
||||||
res.put(row_desc);
|
|
||||||
|
|
||||||
// DataRow
|
// DataRow
|
||||||
res.put_u8(b'D');
|
res.put(data_row(&vec![value.to_string()]));
|
||||||
res.put_i32(data_row.len() as i32 + 4);
|
|
||||||
res.put(data_row);
|
|
||||||
|
|
||||||
// CommandComplete
|
// CommandComplete
|
||||||
res.put_u8(b'C');
|
res.put(command_complete("SELECT 1"));
|
||||||
res.put_i32(command_complete.len() as i32 + 4);
|
|
||||||
res.put(command_complete);
|
|
||||||
|
|
||||||
// ReadyForQuery
|
// ReadyForQuery
|
||||||
res.put_u8(b'Z');
|
res.put_u8(b'Z');
|
||||||
@@ -323,6 +296,77 @@ pub async fn show_response(
|
|||||||
write_all_half(stream, res).await
|
write_all_half(stream, res).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
let mut row_desc = BytesMut::new();
|
||||||
|
|
||||||
|
// how many colums we are storing
|
||||||
|
row_desc.put_i16(columns.len() as i16);
|
||||||
|
|
||||||
|
for (name, data_type) in columns {
|
||||||
|
// Column name
|
||||||
|
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
||||||
|
|
||||||
|
// Doesn't belong to any table
|
||||||
|
row_desc.put_i32(0);
|
||||||
|
|
||||||
|
// Doesn't belong to any table
|
||||||
|
row_desc.put_i16(0);
|
||||||
|
|
||||||
|
// Text
|
||||||
|
row_desc.put_i32(data_type.into());
|
||||||
|
|
||||||
|
// Text size = variable (-1)
|
||||||
|
let type_size = match data_type {
|
||||||
|
DataType::Text => -1,
|
||||||
|
DataType::Int4 => 4,
|
||||||
|
DataType::Numeric => -1,
|
||||||
|
};
|
||||||
|
|
||||||
|
row_desc.put_i16(type_size);
|
||||||
|
|
||||||
|
// Type modifier: none that I know
|
||||||
|
row_desc.put_i32(-1);
|
||||||
|
|
||||||
|
// Format being used: text (0), binary (1)
|
||||||
|
row_desc.put_i16(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put_u8(b'T');
|
||||||
|
res.put_i32(row_desc.len() as i32 + 4);
|
||||||
|
res.put(row_desc);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn data_row(row: &Vec<String>) -> BytesMut {
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
let mut data_row = BytesMut::new();
|
||||||
|
|
||||||
|
data_row.put_i16(row.len() as i16);
|
||||||
|
|
||||||
|
for column in row {
|
||||||
|
let column = column.as_bytes();
|
||||||
|
data_row.put_i32(column.len() as i32);
|
||||||
|
data_row.put_slice(&column);
|
||||||
|
}
|
||||||
|
|
||||||
|
res.put_u8(b'D');
|
||||||
|
res.put_i32(data_row.len() as i32 + 4);
|
||||||
|
res.put(data_row);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn command_complete(command: &str) -> BytesMut {
|
||||||
|
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put_u8(b'C');
|
||||||
|
res.put_i32(cmd.len() as i32 + 4);
|
||||||
|
res.put(cmd);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
/// Write all data in the buffer to the TcpStream.
|
/// Write all data in the buffer to the TcpStream.
|
||||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||||
match stream.write_all(&buf).await {
|
match stream.write_all(&buf).await {
|
||||||
|
|||||||
84
src/pool.rs
84
src/pool.rs
@@ -38,6 +38,7 @@ impl ConnectionPool {
|
|||||||
let mut shards = Vec::new();
|
let mut shards = Vec::new();
|
||||||
let mut addresses = Vec::new();
|
let mut addresses = Vec::new();
|
||||||
let mut banlist = Vec::new();
|
let mut banlist = Vec::new();
|
||||||
|
let mut address_id = 0;
|
||||||
let mut shard_ids = config
|
let mut shard_ids = config
|
||||||
.shards
|
.shards
|
||||||
.clone()
|
.clone()
|
||||||
@@ -49,7 +50,8 @@ impl ConnectionPool {
|
|||||||
for shard_idx in shard_ids {
|
for shard_idx in shard_ids {
|
||||||
let shard = &config.shards[&shard_idx];
|
let shard = &config.shards[&shard_idx];
|
||||||
let mut pools = Vec::new();
|
let mut pools = Vec::new();
|
||||||
let mut replica_addresses = Vec::new();
|
let mut servers = Vec::new();
|
||||||
|
let mut replica_number = 0;
|
||||||
|
|
||||||
for server in shard.servers.iter() {
|
for server in shard.servers.iter() {
|
||||||
let role = match server.2.as_ref() {
|
let role = match server.2.as_ref() {
|
||||||
@@ -62,12 +64,20 @@ impl ConnectionPool {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let address = Address {
|
let address = Address {
|
||||||
|
id: address_id,
|
||||||
host: server.0.clone(),
|
host: server.0.clone(),
|
||||||
port: server.1.to_string(),
|
port: server.1.to_string(),
|
||||||
role: role,
|
role: role,
|
||||||
|
replica_number,
|
||||||
shard: shard_idx.parse::<usize>().unwrap(),
|
shard: shard_idx.parse::<usize>().unwrap(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
address_id += 1;
|
||||||
|
|
||||||
|
if role == Role::Replica {
|
||||||
|
replica_number += 1;
|
||||||
|
}
|
||||||
|
|
||||||
let manager = ServerPool::new(
|
let manager = ServerPool::new(
|
||||||
address.clone(),
|
address.clone(),
|
||||||
config.user.clone(),
|
config.user.clone(),
|
||||||
@@ -87,11 +97,11 @@ impl ConnectionPool {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
pools.push(pool);
|
pools.push(pool);
|
||||||
replica_addresses.push(address);
|
servers.push(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
shards.push(pools);
|
shards.push(pools);
|
||||||
addresses.push(replica_addresses);
|
addresses.push(servers);
|
||||||
banlist.push(HashMap::new());
|
banlist.push(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,9 +125,13 @@ impl ConnectionPool {
|
|||||||
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
||||||
let mut server_infos = Vec::new();
|
let mut server_infos = Vec::new();
|
||||||
|
|
||||||
|
let stats = self.stats.clone();
|
||||||
for shard in 0..self.shards() {
|
for shard in 0..self.shards() {
|
||||||
for _ in 0..self.servers(shard) {
|
for _ in 0..self.servers(shard) {
|
||||||
let connection = match self.get(shard, None).await {
|
// To keep stats consistent.
|
||||||
|
let fake_process_id = 0;
|
||||||
|
|
||||||
|
let connection = match self.get(shard, None, fake_process_id).await {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
||||||
@@ -126,10 +140,24 @@ impl ConnectionPool {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut proxy = connection.0;
|
let mut proxy = connection.0;
|
||||||
let _address = connection.1;
|
let address = connection.1;
|
||||||
let server = &mut *proxy;
|
let server = &mut *proxy;
|
||||||
|
|
||||||
server_infos.push(server.server_info());
|
let server_info = server.server_info();
|
||||||
|
|
||||||
|
stats.client_disconnecting(fake_process_id, address.id);
|
||||||
|
|
||||||
|
if server_infos.len() > 0 {
|
||||||
|
// Compare against the last server checked.
|
||||||
|
if server_info != server_infos[server_infos.len() - 1] {
|
||||||
|
warn!(
|
||||||
|
"{:?} has different server configuration than the last server",
|
||||||
|
address
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
server_infos.push(server_info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,6 +175,7 @@ impl ConnectionPool {
|
|||||||
&mut self,
|
&mut self,
|
||||||
shard: usize,
|
shard: usize,
|
||||||
role: Option<Role>,
|
role: Option<Role>,
|
||||||
|
process_id: i32,
|
||||||
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let addresses = &self.addresses[shard];
|
let addresses = &self.addresses[shard];
|
||||||
@@ -182,6 +211,8 @@ impl ConnectionPool {
|
|||||||
let index = self.round_robin % addresses.len();
|
let index = self.round_robin % addresses.len();
|
||||||
let address = &addresses[index];
|
let address = &addresses[index];
|
||||||
|
|
||||||
|
self.stats.client_waiting(process_id, address.id);
|
||||||
|
|
||||||
// Make sure you're getting a primary or a replica
|
// Make sure you're getting a primary or a replica
|
||||||
// as per request. If no specific role is requested, the first
|
// as per request. If no specific role is requested, the first
|
||||||
// available will be chosen.
|
// available will be chosen.
|
||||||
@@ -201,6 +232,9 @@ impl ConnectionPool {
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Banning replica {}, error: {:?}", index, err);
|
error!("Banning replica {}, error: {:?}", index, err);
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
|
self.stats.client_disconnecting(process_id, address.id);
|
||||||
|
self.stats
|
||||||
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -209,7 +243,7 @@ impl ConnectionPool {
|
|||||||
let server = &mut *conn;
|
let server = &mut *conn;
|
||||||
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
||||||
|
|
||||||
self.stats.server_tested(server.process_id());
|
self.stats.server_tested(server.process_id(), address.id);
|
||||||
|
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||||
@@ -220,8 +254,9 @@ impl ConnectionPool {
|
|||||||
// Check if health check succeeded
|
// Check if health check succeeded
|
||||||
Ok(res) => match res {
|
Ok(res) => match res {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.stats.checkout_time(now.elapsed().as_micros());
|
self.stats
|
||||||
self.stats.server_idle(conn.process_id());
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
|
self.stats.server_idle(conn.process_id(), address.id);
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -230,6 +265,9 @@ impl ConnectionPool {
|
|||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
|
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
|
self.stats.client_disconnecting(process_id, address.id);
|
||||||
|
self.stats
|
||||||
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -240,6 +278,9 @@ impl ConnectionPool {
|
|||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
|
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
|
self.stats.client_disconnecting(process_id, address.id);
|
||||||
|
self.stats
|
||||||
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -324,6 +365,22 @@ impl ConnectionPool {
|
|||||||
pub fn servers(&self, shard: usize) -> usize {
|
pub fn servers(&self, shard: usize) -> usize {
|
||||||
self.addresses[shard].len()
|
self.addresses[shard].len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn databases(&self) -> usize {
|
||||||
|
let mut databases = 0;
|
||||||
|
for shard in 0..self.shards() {
|
||||||
|
databases += self.servers(shard);
|
||||||
|
}
|
||||||
|
databases
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
|
||||||
|
self.databases[shard][server].state()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn address(&self, shard: usize, server: usize) -> &Address {
|
||||||
|
&self.addresses[shard][server]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ServerPool {
|
pub struct ServerPool {
|
||||||
@@ -361,13 +418,14 @@ impl ManageConnection for ServerPool {
|
|||||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||||
info!(
|
info!(
|
||||||
"Creating a new connection to {:?} using user {:?}",
|
"Creating a new connection to {:?} using user {:?}",
|
||||||
self.address, self.user.name
|
self.address.name(),
|
||||||
|
self.user.name
|
||||||
);
|
);
|
||||||
|
|
||||||
// Put a temporary process_id into the stats
|
// Put a temporary process_id into the stats
|
||||||
// for server login.
|
// for server login.
|
||||||
let process_id = rand::random::<i32>();
|
let process_id = rand::random::<i32>();
|
||||||
self.stats.server_login(process_id);
|
self.stats.server_login(process_id, self.address.id);
|
||||||
|
|
||||||
match Server::startup(
|
match Server::startup(
|
||||||
&self.address,
|
&self.address,
|
||||||
@@ -380,12 +438,12 @@ impl ManageConnection for ServerPool {
|
|||||||
{
|
{
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
// Remove the temporary process_id from the stats.
|
// Remove the temporary process_id from the stats.
|
||||||
self.stats.server_disconnecting(process_id);
|
self.stats.server_disconnecting(process_id, self.address.id);
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Remove the temporary process_id from the stats.
|
// Remove the temporary process_id from the stats.
|
||||||
self.stats.server_disconnecting(process_id);
|
self.stats.server_disconnecting(process_id, self.address.id);
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use sqlparser::parser::Parser;
|
|||||||
|
|
||||||
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
||||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||||
r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$",
|
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
||||||
r"(?i)^ *SHOW SHARD *;? *$",
|
r"(?i)^ *SHOW SHARD *;? *$",
|
||||||
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
||||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||||
@@ -192,7 +192,10 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Command::SetShard => {
|
Command::SetShard => {
|
||||||
self.active_shard = Some(value.parse::<usize>().unwrap());
|
self.active_shard = match value.to_ascii_uppercase().as_ref() {
|
||||||
|
"ANY" => Some(rand::random::<usize>() % self.shards),
|
||||||
|
_ => Some(value.parse::<usize>().unwrap()),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Command::SetServerRole => {
|
Command::SetServerRole => {
|
||||||
|
|||||||
@@ -268,7 +268,8 @@ impl Server {
|
|||||||
|
|
||||||
/// Send messages to the server from the client.
|
/// Send messages to the server from the client.
|
||||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||||
self.stats.data_sent(messages.len());
|
self.stats
|
||||||
|
.data_sent(messages.len(), self.process_id, self.address.id);
|
||||||
|
|
||||||
match write_all_half(&mut self.write, messages).await {
|
match write_all_half(&mut self.write, messages).await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
@@ -374,7 +375,8 @@ impl Server {
|
|||||||
let bytes = self.buffer.clone();
|
let bytes = self.buffer.clone();
|
||||||
|
|
||||||
// Keep track of how much data we got from the server for stats.
|
// Keep track of how much data we got from the server for stats.
|
||||||
self.stats.data_received(bytes.len());
|
self.stats
|
||||||
|
.data_received(bytes.len(), self.process_id, self.address.id);
|
||||||
|
|
||||||
// Clear the buffer for next query.
|
// Clear the buffer for next query.
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
@@ -470,7 +472,8 @@ impl Drop for Server {
|
|||||||
/// the socket is in non-blocking mode, so it may not be ready
|
/// the socket is in non-blocking mode, so it may not be ready
|
||||||
/// for a write.
|
/// for a write.
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.stats.server_disconnecting(self.process_id());
|
self.stats
|
||||||
|
.server_disconnecting(self.process_id(), self.address.id);
|
||||||
|
|
||||||
let mut bytes = BytesMut::with_capacity(4);
|
let mut bytes = BytesMut::with_capacity(4);
|
||||||
bytes.put_u8(b'X');
|
bytes.put_u8(b'X');
|
||||||
|
|||||||
308
src/stats.rs
308
src/stats.rs
@@ -1,12 +1,20 @@
|
|||||||
|
/// Statistics and reporting.
|
||||||
use log::info;
|
use log::info;
|
||||||
use statsd::Client;
|
use once_cell::sync::Lazy;
|
||||||
/// Events collector and publisher.
|
use parking_lot::Mutex;
|
||||||
|
use std::collections::HashMap;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
|
|
||||||
use std::collections::HashMap;
|
// Latest stats updated every second; used in SHOW STATS and other admin commands.
|
||||||
|
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
|
||||||
|
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
use crate::config::get_config;
|
// Statistics period used for average calculations.
|
||||||
|
// 15 seconds.
|
||||||
|
static STAT_PERIOD: u64 = 15000;
|
||||||
|
|
||||||
|
/// The names for the events reported
|
||||||
|
/// to the statistics collector.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
enum EventName {
|
enum EventName {
|
||||||
CheckoutTime,
|
CheckoutTime,
|
||||||
@@ -23,201 +31,266 @@ enum EventName {
|
|||||||
ServerTested,
|
ServerTested,
|
||||||
ServerLogin,
|
ServerLogin,
|
||||||
ServerDisconnecting,
|
ServerDisconnecting,
|
||||||
FlushStatsToStatsD,
|
UpdateStats,
|
||||||
|
UpdateAverages,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Event data sent to the collector
|
||||||
|
/// from clients and servers.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
|
/// The name of the event being reported.
|
||||||
name: EventName,
|
name: EventName,
|
||||||
|
|
||||||
|
/// The value being reported. Meaning differs based on event name.
|
||||||
value: i64,
|
value: i64,
|
||||||
process_id: Option<i32>,
|
|
||||||
|
/// The client or server connection reporting the event.
|
||||||
|
process_id: i32,
|
||||||
|
|
||||||
|
/// The server the client is connected to.
|
||||||
|
address_id: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The statistics reporter. An instance is given
|
||||||
|
/// to each possible source of statistics,
|
||||||
|
/// e.g. clients, servers, connection pool.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Reporter {
|
pub struct Reporter {
|
||||||
tx: Sender<Event>,
|
tx: Sender<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Reporter {
|
impl Reporter {
|
||||||
|
/// Create a new Reporter instance.
|
||||||
pub fn new(tx: Sender<Event>) -> Reporter {
|
pub fn new(tx: Sender<Event>) -> Reporter {
|
||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query(&self) {
|
/// Report a query executed by a client against
|
||||||
|
/// a server identified by the `address_id`.
|
||||||
|
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::Query,
|
name: EventName::Query,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: None,
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transaction(&self) {
|
/// Report a transaction executed by a client against
|
||||||
|
/// a server identified by the `address_id`.
|
||||||
|
pub fn transaction(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::Transaction,
|
name: EventName::Transaction,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: None,
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_sent(&self, amount: usize) {
|
/// Report data sent to a server identified by `address_id`.
|
||||||
|
/// The `amount` is measured in bytes.
|
||||||
|
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::DataSent,
|
name: EventName::DataSent,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: None,
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_received(&self, amount: usize) {
|
/// Report data received from a server identified by `address_id`.
|
||||||
|
/// The `amount` is measured in bytes.
|
||||||
|
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::DataReceived,
|
name: EventName::DataReceived,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: None,
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn checkout_time(&self, ms: u128) {
|
/// Time spent waiting to get a healthy connection from the pool
|
||||||
|
/// for a server identified by `address_id`.
|
||||||
|
/// Measured in milliseconds.
|
||||||
|
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::CheckoutTime,
|
name: EventName::CheckoutTime,
|
||||||
value: ms as i64,
|
value: ms as i64,
|
||||||
process_id: None,
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_waiting(&self, process_id: i32) {
|
/// Reports a client identified by `process_id` waiting for a connection
|
||||||
|
/// to a server identified by `address_id`.
|
||||||
|
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientWaiting,
|
name: EventName::ClientWaiting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_active(&self, process_id: i32) {
|
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||||
|
/// to a server identified by `address_id` and is about to query the server.
|
||||||
|
pub fn client_active(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientActive,
|
name: EventName::ClientActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_idle(&self, process_id: i32) {
|
/// Reports a client identified by `process_id` is done querying the server
|
||||||
|
/// identified by `address_id` and is no longer active.
|
||||||
|
pub fn client_idle(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientIdle,
|
name: EventName::ClientIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_disconnecting(&self, process_id: i32) {
|
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||||
|
/// The last server it was connected to is identified by `address_id`.
|
||||||
|
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientDisconnecting,
|
name: EventName::ClientDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_active(&self, process_id: i32) {
|
/// Reports a server connection identified by `process_id` for
|
||||||
|
/// a configured server identified by `address_id` is actively used
|
||||||
|
/// by a client.
|
||||||
|
pub fn server_active(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerActive,
|
name: EventName::ServerActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_idle(&self, process_id: i32) {
|
/// Reports a server connection identified by `process_id` for
|
||||||
|
/// a configured server identified by `address_id` is no longer
|
||||||
|
/// actively used by a client and is now idle.
|
||||||
|
pub fn server_idle(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerIdle,
|
name: EventName::ServerIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_login(&self, process_id: i32) {
|
/// Reports a server connection identified by `process_id` for
|
||||||
|
/// a configured server identified by `address_id` is attempting
|
||||||
|
/// to login.
|
||||||
|
pub fn server_login(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerLogin,
|
name: EventName::ServerLogin,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_tested(&self, process_id: i32) {
|
/// Reports a server connection identified by `process_id` for
|
||||||
|
/// a configured server identified by `address_id` is being
|
||||||
|
/// tested before being given to a client.
|
||||||
|
pub fn server_tested(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerTested,
|
name: EventName::ServerTested,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn server_disconnecting(&self, process_id: i32) {
|
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||||
|
/// The configured server it was connected to is identified by `address_id`.
|
||||||
|
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerDisconnecting,
|
name: EventName::ServerDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: Some(process_id),
|
process_id: process_id,
|
||||||
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn flush_to_statsd(&self) {
|
|
||||||
// let event = Event {
|
|
||||||
// name: EventName::FlushStatsToStatsD,
|
|
||||||
// value: 0,
|
|
||||||
// process_id: None,
|
|
||||||
// };
|
|
||||||
|
|
||||||
// let _ = self.tx.try_send(event);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The statistics collector which is receiving statistics
|
||||||
|
/// from clients, servers, and the connection pool. There is
|
||||||
|
/// only one collector (kind of like a singleton).
|
||||||
|
/// The collector can trigger events on its own, e.g.
|
||||||
|
/// it updates aggregates every second and averages every
|
||||||
|
/// 15 seconds.
|
||||||
pub struct Collector {
|
pub struct Collector {
|
||||||
rx: Receiver<Event>,
|
rx: Receiver<Event>,
|
||||||
tx: Sender<Event>,
|
tx: Sender<Event>,
|
||||||
client: Client,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Collector {
|
impl Collector {
|
||||||
|
/// Create a new collector instance. There should only be one instance
|
||||||
|
/// at a time. This is ensured by mpsc which allows only one receiver.
|
||||||
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
||||||
Collector {
|
Collector { rx, tx }
|
||||||
rx,
|
|
||||||
tx,
|
|
||||||
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn collect(&mut self) {
|
/// The statistics collection handler. It will collect statistics
|
||||||
|
/// for `address_id`s starting at 0 up to `addresses`.
|
||||||
|
pub async fn collect(&mut self, addresses: usize) {
|
||||||
info!("Events reporter started");
|
info!("Events reporter started");
|
||||||
|
|
||||||
let mut stats = HashMap::from([
|
let stats_template = HashMap::from([
|
||||||
("total_query_count", 0),
|
("total_query_count", 0),
|
||||||
("total_xact_count", 0),
|
("total_xact_count", 0),
|
||||||
("total_sent", 0),
|
("total_sent", 0),
|
||||||
("total_received", 0),
|
("total_received", 0),
|
||||||
|
("total_xact_time", 0),
|
||||||
|
("total_query_time", 0),
|
||||||
("total_wait_time", 0),
|
("total_wait_time", 0),
|
||||||
|
("avg_xact_time", 0),
|
||||||
|
("avg_query_time", 0),
|
||||||
|
("avg_xact_count", 0),
|
||||||
|
("avg_sent", 0),
|
||||||
|
("avg_received", 0),
|
||||||
|
("avg_wait_time", 0),
|
||||||
("maxwait_us", 0),
|
("maxwait_us", 0),
|
||||||
("maxwait", 0),
|
("maxwait", 0),
|
||||||
("cl_waiting", 0),
|
("cl_waiting", 0),
|
||||||
@@ -229,21 +302,51 @@ impl Collector {
|
|||||||
("sv_tested", 0),
|
("sv_tested", 0),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
|
let mut stats = HashMap::new();
|
||||||
let tx = self.tx.clone();
|
|
||||||
|
|
||||||
|
// Stats saved after each iteration of the flush event. Used in calculation
|
||||||
|
// of averages in the last flush period.
|
||||||
|
let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
|
||||||
|
|
||||||
|
// Track which state the client and server are at any given time.
|
||||||
|
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||||
|
|
||||||
|
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||||
|
let tx = self.tx.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15000));
|
let mut interval =
|
||||||
|
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let _ = tx.try_send(Event {
|
for address_id in 0..addresses {
|
||||||
name: EventName::FlushStatsToStatsD,
|
let _ = tx.try_send(Event {
|
||||||
value: 0,
|
name: EventName::UpdateStats,
|
||||||
process_id: None,
|
value: 0,
|
||||||
});
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let tx = self.tx.clone();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut interval =
|
||||||
|
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
for address_id in 0..addresses {
|
||||||
|
let _ = tx.try_send(Event {
|
||||||
|
name: EventName::UpdateAverages,
|
||||||
|
value: 0,
|
||||||
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// The collector loop
|
||||||
loop {
|
loop {
|
||||||
let stat = match self.rx.recv().await {
|
let stat = match self.rx.recv().await {
|
||||||
Some(stat) => stat,
|
Some(stat) => stat,
|
||||||
@@ -253,6 +356,14 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let stats = stats
|
||||||
|
.entry(stat.address_id)
|
||||||
|
.or_insert(stats_template.clone());
|
||||||
|
let client_server_states = client_server_states
|
||||||
|
.entry(stat.address_id)
|
||||||
|
.or_insert(HashMap::new());
|
||||||
|
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
EventName::Query => {
|
EventName::Query => {
|
||||||
@@ -280,10 +391,11 @@ impl Collector {
|
|||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
|
|
||||||
let counter = stats.entry("maxwait_us").or_insert(0);
|
let counter = stats.entry("maxwait_us").or_insert(0);
|
||||||
|
let mic_part = stat.value % 1_000_000;
|
||||||
|
|
||||||
// Report max time here
|
// Report max time here
|
||||||
if stat.value > *counter {
|
if mic_part > *counter {
|
||||||
*counter = stat.value;
|
*counter = mic_part;
|
||||||
}
|
}
|
||||||
|
|
||||||
let counter = stats.entry("maxwait").or_insert(0);
|
let counter = stats.entry("maxwait").or_insert(0);
|
||||||
@@ -301,15 +413,16 @@ impl Collector {
|
|||||||
| EventName::ServerIdle
|
| EventName::ServerIdle
|
||||||
| EventName::ServerTested
|
| EventName::ServerTested
|
||||||
| EventName::ServerLogin => {
|
| EventName::ServerLogin => {
|
||||||
client_server_states.insert(stat.process_id.unwrap(), stat.name);
|
client_server_states.insert(stat.process_id, stat.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
||||||
client_server_states.remove(&stat.process_id.unwrap());
|
client_server_states.remove(&stat.process_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::FlushStatsToStatsD => {
|
EventName::UpdateStats => {
|
||||||
for (_, state) in &client_server_states {
|
// Calculate connection states
|
||||||
|
for (_, state) in client_server_states.iter() {
|
||||||
match state {
|
match state {
|
||||||
EventName::ClientActive => {
|
EventName::ClientActive => {
|
||||||
let counter = stats.entry("cl_active").or_insert(0);
|
let counter = stats.entry("cl_active").or_insert(0);
|
||||||
@@ -321,11 +434,6 @@ impl Collector {
|
|||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::ClientIdle => {
|
|
||||||
let counter = stats.entry("cl_idle").or_insert(0);
|
|
||||||
*counter += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
EventName::ServerIdle => {
|
EventName::ServerIdle => {
|
||||||
let counter = stats.entry("sv_idle").or_insert(0);
|
let counter = stats.entry("sv_idle").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
@@ -346,22 +454,64 @@ impl Collector {
|
|||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EventName::ClientIdle => {
|
||||||
|
let counter = stats.entry("cl_idle").or_insert(0);
|
||||||
|
*counter += 1;
|
||||||
|
}
|
||||||
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("{:?}", stats);
|
// Update latest stats used in SHOW STATS
|
||||||
|
let mut guard = LATEST_STATS.lock();
|
||||||
let mut pipeline = self.client.pipeline();
|
for (key, value) in stats.iter() {
|
||||||
|
let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
|
||||||
for (key, value) in stats.iter_mut() {
|
entry.insert(key.to_string(), value.clone());
|
||||||
pipeline.gauge(key, *value as f64);
|
|
||||||
*value = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pipeline.send(&self.client);
|
// These are re-calculated every iteration of the loop, so we don't want to add values
|
||||||
|
// from the last iteration.
|
||||||
|
for stat in &[
|
||||||
|
"cl_active",
|
||||||
|
"cl_waiting",
|
||||||
|
"cl_idle",
|
||||||
|
"sv_idle",
|
||||||
|
"sv_active",
|
||||||
|
"sv_tested",
|
||||||
|
"sv_login",
|
||||||
|
"maxwait",
|
||||||
|
"maxwait_us",
|
||||||
|
] {
|
||||||
|
stats.insert(stat, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EventName::UpdateAverages => {
|
||||||
|
// Calculate averages
|
||||||
|
for stat in &[
|
||||||
|
"avg_query_count",
|
||||||
|
"avgxact_count",
|
||||||
|
"avg_sent",
|
||||||
|
"avg_received",
|
||||||
|
"avg_wait_time",
|
||||||
|
] {
|
||||||
|
let total_name = stat.replace("avg_", "total_");
|
||||||
|
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||||
|
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||||
|
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||||
|
|
||||||
|
stats.insert(stat, avg);
|
||||||
|
*old_value = new_value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a snapshot of statistics. Updated once a second
|
||||||
|
/// by the `Collector`.
|
||||||
|
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
|
||||||
|
LATEST_STATS.lock().clone()
|
||||||
|
}
|
||||||
|
|||||||
28
tests/pgbench/simple.sql
Normal file
28
tests/pgbench/simple.sql
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
|
||||||
|
-- \setrandom aid 1 :naccounts
|
||||||
|
\set aid random(1, 100000)
|
||||||
|
-- \setrandom bid 1 :nbranches
|
||||||
|
\set bid random(1, 100000)
|
||||||
|
-- \setrandom tid 1 :ntellers
|
||||||
|
\set tid random(1, 100000)
|
||||||
|
-- \setrandom delta -5000 5000
|
||||||
|
\set delta random(-5000,5000)
|
||||||
|
|
||||||
|
\set shard random(0, 2)
|
||||||
|
|
||||||
|
SET SHARD TO :shard;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
|
||||||
|
|
||||||
|
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
|
||||||
|
|
||||||
|
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
|
||||||
|
|
||||||
|
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
|
||||||
|
|
||||||
|
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
Reference in New Issue
Block a user