Compare commits

..

18 Commits

Author SHA1 Message Date
Lev Kokotov
341ebf4123 docs and remove Option (#58)
* docs and remove Option

* lint
2022-03-07 23:05:40 -08:00
Lev Kokotov
35828a0a8c Per-shard statistics (#57)
* per shard stats

* aight

* cleaner

* fix show lists

* comments

* more friendly

* case-insensitive

* test all shards

* ok

* HUH?
2022-03-04 17:04:27 -08:00
Lev Kokotov
1e8fa110ae Fix pgbouncerhero (#54) 2022-03-02 14:46:31 -08:00
Lev Kokotov
d4186b7815 More admin (#53)
* more admin

* more admin

* show lists

* tests
2022-03-01 22:49:43 -08:00
Lev Kokotov
aaeef69d59 Refactor admin (#52) 2022-03-01 08:47:19 -08:00
Lev Kokotov
b21e0f4a7e admin SHOW DATABASES (#51)
* admin SHOW DATABASES

* test

* correct replica count
2022-02-28 17:22:28 -08:00
Lev Kokotov
eb1473060e admin: SHOW CONFIG (#50)
* admin: SHOW CONFIG

* test
2022-02-28 08:14:39 -08:00
Lev Kokotov
26f75f8d5d admin RELOAD (#49)
* admin RELOAD

* test
2022-02-27 10:21:24 -08:00
Lev Kokotov
99d65fc475 Check server versions on startup & refactor (#48)
* Refactor and check server parameters

* warnings

* fix validator
2022-02-26 11:01:52 -08:00
Lev Kokotov
206fdc9769 Fix some stats (#47)
* fix some stats

* use constant

* lint
2022-02-26 10:03:11 -08:00
Lev Kokotov
f74101cdfe admin: SHOW STATS (#46)
* admin: show stats

* warning

* tests

* lint

* type mod
2022-02-25 18:20:15 -08:00
Lev Kokotov
8e0682482d query routing docs (#45) 2022-02-25 14:27:33 -08:00
Lev Kokotov
6db51b4a11 Use Toxiproxy for failover testing (#44)
* Toxiproxy

* up-to-date config

* debug

* hm

* more

* mroe

* more

* hmm

* aha

* less logs

* cleaner

* hmm

* we test these now

* update readme
2022-02-24 20:55:19 -08:00
Lev Kokotov
a784883611 Allow to set shard and set sharding key without quotes (#43)
* Allow to set shard and set sharding key without quotes

* cover it

* dont look for these in the middle of another query

* friendly regex

* its own response to set shard key
2022-02-24 12:16:24 -08:00
Lev Kokotov
5972b6fa52 Switch to parking_lot RwLock & Mutex. Use trace! for protocol instead of debug! (#42)
* RwLock & parking_lot::Mutex

* upgrade to trace
2022-02-24 08:44:41 -08:00
Lev Kokotov
b3c8ca4b8a Another example of a sharding function (#41)
* Another example of a sharding function

* tests
2022-02-23 11:50:34 -08:00
Lev Kokotov
dce72ba262 Add debug logging (#39)
* Add debug for easier debugging

* fmt

* a couple more messages
2022-02-22 19:26:08 -08:00
Lev Kokotov
af1716bcd7 Flush stats (#38)
* flush stats

* stats

* refactor
2022-02-22 18:10:30 -08:00
18 changed files with 1536 additions and 331 deletions

108
.circleci/pgcat.toml Normal file
View File

@@ -0,0 +1,108 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# How many connections to allocate per server.
pool_size = 15
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# How long to wait before aborting a server connection (ms).
connect_timeout = 100
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 100
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# Stats will be sent here
statsd_address = "127.0.0.1:8125"
#
# User to use for authentication against the server.
[user]
name = "sharding_user"
password = "sharding_user"
#
# Shards in the cluster
[shards]
# Shard 0
[shards.0]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5433, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
# Database name (e.g. "postgres")
database = "shard0"
[shards.1]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5433, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "shard1"
[shards.2]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5433, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "shard2"
# Settings for our query routing layer.
[query_router]
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = false
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"

View File

@@ -3,19 +3,36 @@
set -e set -e
set -o xtrace set -o xtrace
# Start PgCat with a particular log level
# for inspection.
function start_pgcat() {
kill -s SIGINT $(pgrep pgcat) || true
RUST_LOG=${1} ./target/debug/pgcat .circleci/pgcat.toml &
sleep 1
}
# Setup the database with shards and user
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
./target/debug/pgcat & # Install Toxiproxy to simulate a downed/slow database
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
sudo dpkg -i toxiproxy-2.1.4.deb
# Start Toxiproxy
toxiproxy-server &
sleep 1 sleep 1
# Setup PgBench # Create a database at port 5433, forward it to Postgres
toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
start_pgcat "info"
# pgbench test
pgbench -i -h 127.0.0.1 -p 6432 pgbench -i -h 127.0.0.1 -p 6432
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
# Run it
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple
# Extended protocol
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
# COPY TO STDOUT test # COPY TO STDOUT test
@@ -35,18 +52,48 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
# #
# ActiveRecord tests! # ActiveRecord tests
# #
cd tests/ruby cd tests/ruby && \
sudo gem install bundler sudo gem install bundler && \
bundle install bundle install && \
ruby tests.rb ruby tests.rb && \
cd ../..
# Admin tests
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
# Start PgCat in debug to demonstrate failover better
start_pgcat "debug"
# Add latency to the replica at port 5433 slightly above the healthcheck timeout
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica
sleep 1
# Note the failover in the logs
timeout 5 psql -e -h 127.0.0.1 -p 6432 <<-EOF
SELECT 1;
SELECT 1;
SELECT 1;
EOF
# Remove latency
toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
start_pgcat "info"
cd ../../
# Test session mode (and config reload) # Test session mode (and config reload)
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml
# Reload config # Reload config test
kill -SIGHUP $(pgrep pgcat) kill -SIGHUP $(pgrep pgcat)
# Prepared statements that will only work in session mode # Prepared statements that will only work in session mode

11
Cargo.lock generated
View File

@@ -364,13 +364,13 @@ dependencies = [
"md-5", "md-5",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
"parking_lot",
"rand", "rand",
"regex", "regex",
"serde", "serde",
"serde_derive", "serde_derive",
"sha-1", "sha-1",
"sqlparser", "sqlparser",
"statsd",
"tokio", "tokio",
"toml", "toml",
] ]
@@ -541,15 +541,6 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "statsd"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330"
dependencies = [
"rand",
]
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.86"

View File

@@ -20,8 +20,8 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
statsd = "0.15"
sqlparser = "0.14" sqlparser = "0.14"
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
env_logger = "0.9" env_logger = "0.9"
parking_lot = "0.11"

View File

@@ -11,14 +11,14 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
## Features ## Features
| **Feature** | **Status** | **Comments** | | **Feature** | **Status** | **Comments** |
|--------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| |--------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. | | Transaction pooling | :white_check_mark: | Identical to PgBouncer. |
| Session pooling | :heavy_check_mark: | Identical to PgBouncer. | | Session pooling | :white_check_mark: | Identical to PgBouncer. |
| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | | `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. | | Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. |
| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | | Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | | Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | | Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. | | Statistics reporting | :white_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
| Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. | | Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. | | Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
@@ -75,15 +75,15 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing.
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** | | **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------| |-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. | | Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
| Session pooling | :heavy_check_mark: | :heavy_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. | | Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. |
| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. | | `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. | | Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. | | Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. | | Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | | Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | | Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
| Live config reloading | :heavy_check_mark: | :heavy_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. | | Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
## Usage ## Usage
@@ -133,6 +133,31 @@ All servers are checked with a `SELECT 1` query before being given to a client.
The ban time can be changed with `ban_time`. The default is 60 seconds. The ban time can be changed with `ban_time`. The default is 60 seconds.
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the round-robin loop. |
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the round-robin loop. |
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the round-robin loop. |
| Read query | replica | false | false | up | Query is routed to the first replica instance in the round-robin loop. |
| Read query | primary | false | false | up | Query is routed to the primary. |
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the round-robin loop is attempted. |
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the round-robin loop. |
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
| | | | | | |
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the round-robin loop. If the instance is a replica, the query fails and the client receives an error. |
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
| Write query | primary | false | false | up | The query is routed to the primary. |
| Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
| Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
| Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
| Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
| | | | | | |
### Sharding ### Sharding
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler. We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.

View File

@@ -96,3 +96,13 @@ query_parser_enabled = false
# load balancing of read queries. Otherwise, the primary will only be used for write # load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol. # queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"

386
src/admin.rs Normal file
View File

@@ -0,0 +1,386 @@
use bytes::{Buf, BufMut, BytesMut};
use log::{info, trace};
use tokio::net::tcp::OwnedWriteHalf;
use std::collections::HashMap;
use crate::config::{get_config, parse};
use crate::errors::Error;
use crate::messages::*;
use crate::pool::ConnectionPool;
use crate::stats::get_stats;
/// Handle admin client
pub async fn handle_admin(
stream: &mut OwnedWriteHalf,
mut query: BytesMut,
pool: ConnectionPool,
) -> Result<(), Error> {
let code = query.get_u8() as char;
if code != 'Q' {
return Err(Error::ProtocolSyncError);
}
let len = query.get_i32() as usize;
let query = String::from_utf8_lossy(&query[..len - 5])
.to_string()
.to_ascii_uppercase();
trace!("Admin query: {}", query);
if query.starts_with("SHOW STATS") {
trace!("SHOW STATS");
show_stats(stream, &pool).await
} else if query.starts_with("RELOAD") {
trace!("RELOAD");
reload(stream).await
} else if query.starts_with("SHOW CONFIG") {
trace!("SHOW CONFIG");
show_config(stream).await
} else if query.starts_with("SHOW DATABASES") {
trace!("SHOW DATABASES");
show_databases(stream, &pool).await
} else if query.starts_with("SHOW POOLS") {
trace!("SHOW POOLS");
show_pools(stream, &pool).await
} else if query.starts_with("SHOW LISTS") {
trace!("SHOW LISTS");
show_lists(stream, &pool).await
} else if query.starts_with("SHOW VERSION") {
trace!("SHOW VERSION");
show_version(stream).await
} else if query.starts_with("SET ") {
trace!("SET");
ignore_set(stream).await
} else {
error_response(stream, "Unsupported query against the admin database").await
}
}
/// SHOW LISTS
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats();
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
let mut res = BytesMut::new();
res.put(row_description(&columns));
res.put(data_row(&vec![
"databases".to_string(),
(pool.databases() + 1).to_string(), // see comment below
]));
res.put(data_row(&vec!["users".to_string(), "1".to_string()]));
res.put(data_row(&vec![
"pools".to_string(),
(pool.databases() + 1).to_string(), // +1 for the pgbouncer admin db pool which isn't real
])); // but admin tools that work with pgbouncer want this
res.put(data_row(&vec![
"free_clients".to_string(),
stats
.keys()
.map(|address_id| stats[&address_id]["cl_idle"])
.sum::<i64>()
.to_string(),
]));
res.put(data_row(&vec![
"used_clients".to_string(),
stats
.keys()
.map(|address_id| stats[&address_id]["cl_active"])
.sum::<i64>()
.to_string(),
]));
res.put(data_row(&vec![
"login_clients".to_string(),
"0".to_string(),
]));
res.put(data_row(&vec![
"free_servers".to_string(),
stats
.keys()
.map(|address_id| stats[&address_id]["sv_idle"])
.sum::<i64>()
.to_string(),
]));
res.put(data_row(&vec![
"used_servers".to_string(),
stats
.keys()
.map(|address_id| stats[&address_id]["sv_active"])
.sum::<i64>()
.to_string(),
]));
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()]));
res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()]));
res.put(command_complete("SHOW"));
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// SHOW VERSION
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new();
res.put(row_description(&vec![("version", DataType::Text)]));
res.put(data_row(&vec!["PgCat 0.1.0".to_string()]));
res.put(command_complete("SHOW"));
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// SHOW POOLS
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats();
let config = {
let guard = get_config();
&*guard.clone()
};
let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
("pool_mode", DataType::Text),
];
let mut res = BytesMut::new();
res.put(row_description(&columns));
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
let mut row = vec![address.name(), config.user.name.clone()];
for column in &columns[2..columns.len() - 1] {
let value = stats.get(column.0).unwrap_or(&0).to_string();
row.push(value);
}
row.push(config.general.pool_mode.to_string());
res.put(data_row(&row));
}
}
res.put(command_complete("SHOW"));
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// SHOW DATABASES
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
drop(guard);
// Columns
let columns = vec![
("name", DataType::Text),
("host", DataType::Text),
("port", DataType::Text),
("database", DataType::Text),
("force_user", DataType::Text),
("pool_size", DataType::Int4),
("min_pool_size", DataType::Int4),
("reserve_pool", DataType::Int4),
("pool_mode", DataType::Text),
("max_connections", DataType::Int4),
("current_connections", DataType::Int4),
("paused", DataType::Int4),
("disabled", DataType::Int4),
];
let mut res = BytesMut::new();
// RowDescription
res.put(row_description(&columns));
for shard in 0..pool.shards() {
let database_name = &config.shards[&shard.to_string()].database;
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);
res.put(data_row(&vec![
address.name(), // name
address.host.to_string(), // host
address.port.to_string(), // port
database_name.to_string(), // database
config.user.name.to_string(), // force_user
config.general.pool_size.to_string(), // pool_size
"0".to_string(), // min_pool_size
"0".to_string(), // reserve_pool
config.general.pool_mode.to_string(), // pool_mode
config.general.pool_size.to_string(), // max_connections
pool_state.connections.to_string(), // current_connections
"0".to_string(), // paused
"0".to_string(), // disabled
]));
}
}
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// Ignore any SET commands the client sends.
/// This is common initialization done by ORMs.
async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
custom_protocol_response_ok(stream, "SET").await
}
/// RELOAD
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
info!("Reloading config");
let config = get_config();
let path = config.path.clone().unwrap();
parse(&path).await?;
let config = get_config();
config.show();
let mut res = BytesMut::new();
// CommandComplete
res.put(command_complete("RELOAD"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
let config: HashMap<String, String> = config.into();
drop(guard);
// Configs that cannot be changed dynamically.
let immutables = ["host", "port", "connect_timeout"];
// Columns
let columns = vec![
("key", DataType::Text),
("value", DataType::Text),
("default", DataType::Text),
("changeable", DataType::Text),
];
// Response data
let mut res = BytesMut::new();
res.put(row_description(&columns));
// DataRow rows
for (key, value) in config {
let changeable = if immutables.iter().filter(|col| *col == &key).count() == 1 {
"no".to_string()
} else {
"yes".to_string()
};
let row = vec![key, value, "-".to_string(), changeable];
res.put(data_row(&row));
}
res.put(command_complete("SHOW"));
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// SHOW STATS
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let columns = vec![
("database", DataType::Text),
("total_xact_count", DataType::Numeric),
("total_query_count", DataType::Numeric),
("total_received", DataType::Numeric),
("total_sent", DataType::Numeric),
("total_xact_time", DataType::Numeric),
("total_query_time", DataType::Numeric),
("total_wait_time", DataType::Numeric),
("avg_xact_count", DataType::Numeric),
("avg_query_count", DataType::Numeric),
("avg_recv", DataType::Numeric),
("avg_sent", DataType::Numeric),
("avg_xact_time", DataType::Numeric),
("avg_query_time", DataType::Numeric),
("avg_wait_time", DataType::Numeric),
];
let stats = get_stats();
let mut res = BytesMut::new();
res.put(row_description(&columns));
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
let mut row = vec![address.name()];
for column in &columns[1..] {
row.push(stats.get(column.0).unwrap_or(&0).to_string());
}
res.put(data_row(&row));
}
}
res.put(command_complete("SHOW"));
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}

View File

@@ -2,7 +2,7 @@
/// We are pretending to the server in this scenario, /// We are pretending to the server in this scenario,
/// and this module implements that. /// and this module implements that.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::error; use log::{debug, error, trace};
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -11,6 +11,7 @@ use tokio::net::{
use std::collections::HashMap; use std::collections::HashMap;
use crate::admin::handle_admin;
use crate::config::get_config; use crate::config::get_config;
use crate::constants::*; use crate::constants::*;
use crate::errors::Error; use crate::errors::Error;
@@ -54,6 +55,15 @@ pub struct Client {
// Statistics // Statistics
stats: Reporter, stats: Reporter,
// Clients want to talk to admin
admin: bool,
// Last address the client talked to
last_address_id: Option<usize>,
// Last server process id we talked to
last_server_id: Option<i32>,
} }
impl Client { impl Client {
@@ -70,6 +80,8 @@ impl Client {
let transaction_mode = config.general.pool_mode.starts_with("t"); let transaction_mode = config.general.pool_mode.starts_with("t");
drop(config); drop(config);
loop { loop {
trace!("Waiting for StartupMessage");
// Could be StartupMessage or SSLRequest // Could be StartupMessage or SSLRequest
// which makes this variable length. // which makes this variable length.
let len = match stream.read_i32().await { let len = match stream.read_i32().await {
@@ -91,6 +103,8 @@ impl Client {
match code { match code {
// Client wants SSL. We don't support it at the moment. // Client wants SSL. We don't support it at the moment.
SSL_REQUEST_CODE => { SSL_REQUEST_CODE => {
trace!("Rejecting SSLRequest");
let mut no = BytesMut::with_capacity(1); let mut no = BytesMut::with_capacity(1);
no.put_u8(b'N'); no.put_u8(b'N');
@@ -99,6 +113,8 @@ impl Client {
// Regular startup message. // Regular startup message.
PROTOCOL_VERSION_NUMBER => { PROTOCOL_VERSION_NUMBER => {
trace!("Got StartupMessage");
// TODO: perform actual auth. // TODO: perform actual auth.
let parameters = parse_startup(bytes.clone())?; let parameters = parse_startup(bytes.clone())?;
@@ -110,6 +126,16 @@ impl Client {
write_all(&mut stream, server_info).await?; write_all(&mut stream, server_info).await?;
backend_key_data(&mut stream, process_id, secret_key).await?; backend_key_data(&mut stream, process_id, secret_key).await?;
ready_for_query(&mut stream).await?; ready_for_query(&mut stream).await?;
trace!("Startup OK");
let database = parameters
.get("database")
.unwrap_or(parameters.get("user").unwrap());
let admin = ["pgcat", "pgbouncer"]
.iter()
.filter(|db| *db == &database)
.count()
== 1;
// Split the read and write streams // Split the read and write streams
// so we can control buffering. // so we can control buffering.
@@ -126,6 +152,9 @@ impl Client {
client_server_map: client_server_map, client_server_map: client_server_map,
parameters: parameters, parameters: parameters,
stats: stats, stats: stats,
admin: admin,
last_address_id: None,
last_server_id: None,
}); });
} }
@@ -147,6 +176,9 @@ impl Client {
client_server_map: client_server_map, client_server_map: client_server_map,
parameters: HashMap::new(), parameters: HashMap::new(),
stats: stats, stats: stats,
admin: false,
last_address_id: None,
last_server_id: None,
}); });
} }
@@ -161,8 +193,10 @@ impl Client {
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> { pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously. // The client wants to cancel a query it has issued previously.
if self.cancel_mode { if self.cancel_mode {
trace!("Sending CancelRequest");
let (process_id, secret_key, address, port) = { let (process_id, secret_key, address, port) = {
let guard = self.client_server_map.lock().unwrap(); let guard = self.client_server_map.lock();
match guard.get(&(self.process_id, self.secret_key)) { match guard.get(&(self.process_id, self.secret_key)) {
// Drop the mutex as soon as possible. // Drop the mutex as soon as possible.
@@ -193,8 +227,7 @@ impl Client {
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocols. // or issue commands for our sharding and server selection protocols.
loop { loop {
// Client idle, waiting for messages. trace!("Client idle, waiting for message");
self.stats.client_idle(self.process_id);
// Read a complete message from the client, which normally would be // Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol). // either a `Q` (query) or `P` (prepare, extended protocol).
@@ -203,6 +236,19 @@ impl Client {
// SET SHARDING KEY TO 'bigint'; // SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?; let mut message = read_message(&mut self.read).await?;
// Avoid taking a server if the client just wants to disconnect.
if message[0] as char == 'X' {
trace!("Client disconnecting");
return Ok(());
}
// Handle admin database real quick
if self.admin {
trace!("Handling admin command");
handle_admin(&mut self.write, message, pool.clone()).await?;
continue;
}
// Handle all custom protocol commands here. // Handle all custom protocol commands here.
match query_router.try_execute_command(message.clone()) { match query_router.try_execute_command(message.clone()) {
// Normal query // Normal query
@@ -212,11 +258,17 @@ impl Client {
} }
} }
Some((Command::SetShard, _)) | Some((Command::SetShardingKey, _)) => { Some((Command::SetShard, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?; custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?;
continue; continue;
} }
Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARDING KEY"))
.await?;
continue;
}
Some((Command::SetServerRole, _)) => { Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue; continue;
@@ -247,12 +299,17 @@ impl Client {
continue; continue;
} }
// Waiting for server connection. debug!("Waiting for connection from pool");
self.stats.client_waiting(self.process_id);
// Grab a server from the pool: the client issued a regular query. // Grab a server from the pool: the client issued a regular query.
let connection = match pool.get(query_router.shard(), query_router.role()).await { let connection = match pool
Ok(conn) => conn, .get(query_router.shard(), query_router.role(), self.process_id)
.await
{
Ok(conn) => {
debug!("Got connection from pool");
conn
}
Err(err) => { Err(err) => {
error!("Could not get connection from pool: {:?}", err); error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool") error_response(&mut self.write, "could not get connection from the pool")
@@ -262,21 +319,37 @@ impl Client {
}; };
let mut reference = connection.0; let mut reference = connection.0;
let _address = connection.1; let address = connection.1;
let server = &mut *reference; let server = &mut *reference;
// Claim this server as mine for query cancellation. // Claim this server as mine for query cancellation.
server.claim(self.process_id, self.secret_key); server.claim(self.process_id, self.secret_key);
// "disconnect" from the previous server stats-wise
if let Some(last_address_id) = self.last_address_id {
self.stats
.client_disconnecting(self.process_id, last_address_id);
}
// Client active & server active // Client active & server active
self.stats.client_active(self.process_id); self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id()); self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id());
debug!(
"Client {:?} talking to server {:?}",
self.write.peer_addr().unwrap(),
server.address()
);
// Transaction loop. Multiple queries can be issued by the client here. // Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over, // The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode. // or until the client disconnects if we are in session mode.
loop { loop {
let mut message = if message.len() == 0 { let mut message = if message.len() == 0 {
trace!("Waiting for message inside transaction or in session mode");
match read_message(&mut self.read).await { match read_message(&mut self.read).await {
Ok(message) => message, Ok(message) => message,
Err(err) => { Err(err) => {
@@ -303,9 +376,13 @@ impl Client {
let code = message.get_u8() as char; let code = message.get_u8() as char;
let _len = message.get_i32() as usize; let _len = message.get_i32() as usize;
trace!("Message: {}", code);
match code { match code {
// ReadyForQuery // ReadyForQuery
'Q' => { 'Q' => {
debug!("Sending query to server");
// TODO: implement retries here for read-only transactions. // TODO: implement retries here for read-only transactions.
server.send(original).await?; server.send(original).await?;
@@ -330,17 +407,17 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(); self.stats.query(self.process_id, address.id);
// The transaction is over, we can release the connection back to the pool. // The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -387,6 +464,8 @@ impl Client {
// Sync // Sync
// Frontend (client) is asking for the query result now. // Frontend (client) is asking for the query result now.
'S' => { 'S' => {
debug!("Sending query to server");
self.buffer.put(&original[..]); self.buffer.put(&original[..]);
// TODO: retries for read-only transactions. // TODO: retries for read-only transactions.
@@ -414,15 +493,15 @@ impl Client {
} }
// Report query executed statistics. // Report query executed statistics.
self.stats.query(); self.stats.query(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -453,10 +532,10 @@ impl Client {
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(); self.stats.transaction(self.process_id, address.id);
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id()); self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -471,19 +550,29 @@ impl Client {
} }
// The server is no longer bound to us, we can't cancel it's queries anymore. // The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
self.release(); self.release();
self.stats.client_idle(self.process_id, address.id);
} }
} }
/// Release the server from being mine. I can't cancel its queries anymore. /// Release the server from being mine. I can't cancel its queries anymore.
pub fn release(&self) { pub fn release(&self) {
let mut guard = self.client_server_map.lock().unwrap(); let mut guard = self.client_server_map.lock();
guard.remove(&(self.process_id, self.secret_key)); guard.remove(&(self.process_id, self.secret_key));
} }
} }
impl Drop for Client { impl Drop for Client {
fn drop(&mut self) { fn drop(&mut self) {
self.stats.client_disconnecting(self.process_id); // Disconnect the client
if let Some(address_id) = self.last_address_id {
self.stats.client_disconnecting(self.process_id, address_id);
// The server is now idle
if let Some(process_id) = self.last_server_id {
self.stats.server_idle(process_id, address_id);
}
}
} }
} }

View File

@@ -19,6 +19,15 @@ pub enum Role {
Replica, Replica,
} }
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Primary => "primary".to_string(),
Role::Replica => "replica".to_string(),
}
}
}
impl PartialEq<Option<Role>> for Role { impl PartialEq<Option<Role>> for Role {
fn eq(&self, other: &Option<Role>) -> bool { fn eq(&self, other: &Option<Role>) -> bool {
match other { match other {
@@ -39,23 +48,37 @@ impl PartialEq<Role> for Option<Role> {
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
pub struct Address { pub struct Address {
pub id: usize,
pub host: String, pub host: String,
pub port: String, pub port: String,
pub shard: usize, pub shard: usize,
pub role: Role, pub role: Role,
pub replica_number: usize,
} }
impl Default for Address { impl Default for Address {
fn default() -> Address { fn default() -> Address {
Address { Address {
id: 0,
host: String::from("127.0.0.1"), host: String::from("127.0.0.1"),
port: String::from("5432"), port: String::from("5432"),
shard: 0, shard: 0,
replica_number: 0,
role: Role::Replica, role: Role::Replica,
} }
} }
} }
impl Address {
pub fn name(&self) -> String {
match self.role {
Role::Primary => format!("shard_{}_primary", self.shard),
Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number),
}
}
}
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
pub struct User { pub struct User {
pub name: String, pub name: String,
@@ -118,6 +141,7 @@ pub struct QueryRouter {
pub default_role: String, pub default_role: String,
pub query_parser_enabled: bool, pub query_parser_enabled: bool,
pub primary_reads_enabled: bool, pub primary_reads_enabled: bool,
pub sharding_function: String,
} }
impl Default for QueryRouter { impl Default for QueryRouter {
@@ -126,12 +150,14 @@ impl Default for QueryRouter {
default_role: String::from("any"), default_role: String::from("any"),
query_parser_enabled: false, query_parser_enabled: false,
primary_reads_enabled: true, primary_reads_enabled: true,
sharding_function: "pg_bigint_hash".to_string(),
} }
} }
} }
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub path: Option<String>,
pub general: General, pub general: General,
pub user: User, pub user: User,
pub shards: HashMap<String, Shard>, pub shards: HashMap<String, Shard>,
@@ -141,6 +167,7 @@ pub struct Config {
impl Default for Config { impl Default for Config {
fn default() -> Config { fn default() -> Config {
Config { Config {
path: Some(String::from("pgcat.toml")),
general: General::default(), general: General::default(),
user: User::default(), user: User::default(),
shards: HashMap::from([(String::from("1"), Shard::default())]), shards: HashMap::from([(String::from("1"), Shard::default())]),
@@ -149,6 +176,52 @@ impl Default for Config {
} }
} }
impl From<&Config> for std::collections::HashMap<String, String> {
fn from(config: &Config) -> HashMap<String, String> {
HashMap::from([
("host".to_string(), config.general.host.to_string()),
("port".to_string(), config.general.port.to_string()),
(
"pool_size".to_string(),
config.general.pool_size.to_string(),
),
(
"pool_mode".to_string(),
config.general.pool_mode.to_string(),
),
(
"connect_timeout".to_string(),
config.general.connect_timeout.to_string(),
),
(
"healthcheck_timeout".to_string(),
config.general.healthcheck_timeout.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
(
"statsd_address".to_string(),
config.general.statsd_address.to_string(),
),
(
"default_role".to_string(),
config.query_router.default_role.to_string(),
),
(
"query_parser_enabled".to_string(),
config.query_router.query_parser_enabled.to_string(),
),
(
"primary_reads_enabled".to_string(),
config.query_router.primary_reads_enabled.to_string(),
),
(
"sharding_function".to_string(),
config.query_router.sharding_function.to_string(),
),
])
}
}
impl Config { impl Config {
pub fn show(&self) { pub fn show(&self) {
info!("Pool size: {}", self.general.pool_size); info!("Pool size: {}", self.general.pool_size);
@@ -159,6 +232,8 @@ impl Config {
self.general.healthcheck_timeout self.general.healthcheck_timeout
); );
info!("Connection timeout: {}ms", self.general.connect_timeout); info!("Connection timeout: {}ms", self.general.connect_timeout);
info!("Sharding function: {}", self.query_router.sharding_function);
info!("Number of shards: {}", self.shards.len());
} }
} }
@@ -185,7 +260,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
} }
}; };
let config: Config = match toml::from_str(&contents) { let mut config: Config = match toml::from_str(&contents) {
Ok(config) => config, Ok(config) => config,
Err(err) => { Err(err) => {
error!("Could not parse config file: {}", err.to_string()); error!("Could not parse config file: {}", err.to_string());
@@ -193,6 +268,18 @@ pub async fn parse(path: &str) -> Result<(), Error> {
} }
}; };
match config.query_router.sharding_function.as_ref() {
"pg_bigint_hash" => (),
"sha1" => (),
_ => {
error!(
"Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}'",
config.query_router.sharding_function
);
return Err(Error::BadConfig);
}
};
// Quick config sanity check. // Quick config sanity check.
for shard in &config.shards { for shard in &config.shards {
// We use addresses as unique identifiers, // We use addresses as unique identifiers,
@@ -263,6 +350,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
} }
}; };
config.path = Some(path.to_string());
CONFIG.store(Arc::new(config.clone())); CONFIG.store(Arc::new(config.clone()));
Ok(()) Ok(())
@@ -280,5 +369,6 @@ mod test {
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1"); assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
assert_eq!(get_config().shards["0"].servers[0].2, "primary"); assert_eq!(get_config().shards["0"].servers[0].2, "primary");
assert_eq!(get_config().query_router.default_role, "any"); assert_eq!(get_config().query_router.default_role, "any");
assert_eq!(get_config().path, Some("pgcat.toml".to_string()));
} }
} }

View File

@@ -20,3 +20,8 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows. // ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
pub const MESSAGE_TERMINATOR: u8 = 0; pub const MESSAGE_TERMINATOR: u8 = 0;
//
// Data types
//
pub const _OID_INT8: i32 = 20; // bigint

View File

@@ -31,11 +31,11 @@ extern crate once_cell;
extern crate serde; extern crate serde;
extern crate serde_derive; extern crate serde_derive;
extern crate sqlparser; extern crate sqlparser;
extern crate statsd;
extern crate tokio; extern crate tokio;
extern crate toml; extern crate toml;
use log::{error, info}; use log::{error, info};
use parking_lot::Mutex;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::{ use tokio::{
signal, signal,
@@ -44,8 +44,9 @@ use tokio::{
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
mod admin;
mod client; mod client;
mod config; mod config;
mod constants; mod constants;
@@ -112,14 +113,18 @@ async fn main() {
// Collect statistics and send them to StatsD // Collect statistics and send them to StatsD
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(async move { // Connection pool for all shards and replicas
let mut stats_collector = Collector::new(rx);
stats_collector.collect().await;
});
let mut pool = let mut pool =
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await; ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
let collector_tx = tx.clone();
let addresses = pool.databases();
tokio::task::spawn(async move {
let mut stats_collector = Collector::new(rx, collector_tx);
stats_collector.collect(addresses).await;
});
// Connect to all servers and validate their versions.
let server_info = match pool.validate().await { let server_info = match pool.validate().await {
Ok(info) => info, Ok(info) => info,
Err(err) => { Err(err) => {

View File

@@ -8,9 +8,26 @@ use tokio::net::{
TcpStream, TcpStream,
}; };
use crate::errors::Error;
use std::collections::HashMap; use std::collections::HashMap;
use crate::errors::Error; /// Postgres data type mappings
/// used in RowDescription ('T') message.
pub enum DataType {
Text,
Int4,
Numeric,
}
impl From<&DataType> for i32 {
fn from(data_type: &DataType) -> i32 {
match data_type {
DataType::Text => 25,
DataType::Int4 => 23,
DataType::Numeric => 1700,
}
}
}
/// Tell the client that authentication handshake completed successfully. /// Tell the client that authentication handshake completed successfully.
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> { pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
@@ -38,7 +55,7 @@ pub async fn backend_key_data(
Ok(write_all(stream, key_data).await?) Ok(write_all(stream, key_data).await?)
} }
#[allow(dead_code)] /// Construct a `Q`: Query message.
pub fn simple_query(query: &str) -> BytesMut { pub fn simple_query(query: &str) -> BytesMut {
let mut res = BytesMut::from(&b"Q"[..]); let mut res = BytesMut::from(&b"Q"[..]);
let query = format!("{}\0", query); let query = format!("{}\0", query);
@@ -91,9 +108,8 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
} }
} }
/// Parse StartupMessage parameters. /// Parse the params the server sends as a key/value format.
/// e.g. user, database, application_name, etc. pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
let mut result = HashMap::new(); let mut result = HashMap::new();
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut tmp = String::new(); let mut tmp = String::new();
@@ -115,7 +131,7 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
// Expect pairs of name and value // Expect pairs of name and value
// and at least one pair to be present. // and at least one pair to be present.
if buf.len() % 2 != 0 && buf.len() >= 2 { if buf.len() % 2 != 0 || buf.len() < 2 {
return Err(Error::ClientBadStartup); return Err(Error::ClientBadStartup);
} }
@@ -127,6 +143,14 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
i += 2; i += 2;
} }
Ok(result)
}
/// Parse StartupMessage parameters.
/// e.g. user, database, application_name, etc.
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
let result = parse_params(bytes)?;
// Minimum required parameters // Minimum required parameters
// I want to have the user at the very minimum, according to the protocol spec. // I want to have the user at the very minimum, according to the protocol spec.
if !result.contains_key("user") { if !result.contains_key("user") {
@@ -252,68 +276,17 @@ pub async fn show_response(
// 3. CommandComplete // 3. CommandComplete
// 4. ReadyForQuery // 4. ReadyForQuery
// RowDescription
let mut row_desc = BytesMut::new();
// Number of columns: 1
row_desc.put_i16(1);
// Column name
row_desc.put_slice(&format!("{}\0", name).as_bytes());
// Doesn't belong to any table
row_desc.put_i32(0);
// Doesn't belong to any table
row_desc.put_i16(0);
// Text
row_desc.put_i32(25);
// Text size = variable (-1)
row_desc.put_i16(-1);
// Type modifier: none that I know
row_desc.put_i32(0);
// Format being used: text (0), binary (1)
row_desc.put_i16(0);
// DataRow
let mut data_row = BytesMut::new();
// Number of columns
data_row.put_i16(1);
// Size of the column content (length of the string really)
data_row.put_i32(value.len() as i32);
// The content
data_row.put_slice(value.as_bytes());
// CommandComplete
let mut command_complete = BytesMut::new();
// Number of rows returned (just one)
command_complete.put_slice(&b"SELECT 1\0"[..]);
// The final messages sent to the client // The final messages sent to the client
let mut res = BytesMut::new(); let mut res = BytesMut::new();
// RowDescription // RowDescription
res.put_u8(b'T'); res.put(row_description(&vec![(name, DataType::Text)]));
res.put_i32(row_desc.len() as i32 + 4);
res.put(row_desc);
// DataRow // DataRow
res.put_u8(b'D'); res.put(data_row(&vec![value.to_string()]));
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);
// CommandComplete // CommandComplete
res.put_u8(b'C'); res.put(command_complete("SELECT 1"));
res.put_i32(command_complete.len() as i32 + 4);
res.put(command_complete);
// ReadyForQuery // ReadyForQuery
res.put_u8(b'Z'); res.put_u8(b'Z');
@@ -323,6 +296,77 @@ pub async fn show_response(
write_all_half(stream, res).await write_all_half(stream, res).await
} }
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
let mut res = BytesMut::new();
let mut row_desc = BytesMut::new();
// how many colums we are storing
row_desc.put_i16(columns.len() as i16);
for (name, data_type) in columns {
// Column name
row_desc.put_slice(&format!("{}\0", name).as_bytes());
// Doesn't belong to any table
row_desc.put_i32(0);
// Doesn't belong to any table
row_desc.put_i16(0);
// Text
row_desc.put_i32(data_type.into());
// Text size = variable (-1)
let type_size = match data_type {
DataType::Text => -1,
DataType::Int4 => 4,
DataType::Numeric => -1,
};
row_desc.put_i16(type_size);
// Type modifier: none that I know
row_desc.put_i32(-1);
// Format being used: text (0), binary (1)
row_desc.put_i16(0);
}
res.put_u8(b'T');
res.put_i32(row_desc.len() as i32 + 4);
res.put(row_desc);
res
}
pub fn data_row(row: &Vec<String>) -> BytesMut {
let mut res = BytesMut::new();
let mut data_row = BytesMut::new();
data_row.put_i16(row.len() as i16);
for column in row {
let column = column.as_bytes();
data_row.put_i32(column.len() as i32);
data_row.put_slice(&column);
}
res.put_u8(b'D');
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);
res
}
pub fn command_complete(command: &str) -> BytesMut {
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
let mut res = BytesMut::new();
res.put_u8(b'C');
res.put_i32(cmd.len() as i32 + 4);
res.put(cmd);
res
}
/// Write all data in the buffer to the TcpStream. /// Write all data in the buffer to the TcpStream.
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> { pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
match stream.write_all(&buf).await { match stream.write_all(&buf).await {

View File

@@ -3,7 +3,8 @@ use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection}; use bb8::{ManageConnection, Pool, PooledConnection};
use bytes::BytesMut; use bytes::BytesMut;
use chrono::naive::NaiveDateTime; use chrono::naive::NaiveDateTime;
use log::{error, info, warn}; use log::{debug, error, info, warn};
use parking_lot::{Mutex, RwLock};
use crate::config::{get_config, Address, Role, User}; use crate::config::{get_config, Address, Role, User};
use crate::errors::Error; use crate::errors::Error;
@@ -11,11 +12,11 @@ use crate::server::Server;
use crate::stats::Reporter; use crate::stats::Reporter;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
// Banlist: bad servers go in here. // Banlist: bad servers go in here.
pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>; pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>; pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -37,6 +38,7 @@ impl ConnectionPool {
let mut shards = Vec::new(); let mut shards = Vec::new();
let mut addresses = Vec::new(); let mut addresses = Vec::new();
let mut banlist = Vec::new(); let mut banlist = Vec::new();
let mut address_id = 0;
let mut shard_ids = config let mut shard_ids = config
.shards .shards
.clone() .clone()
@@ -48,7 +50,8 @@ impl ConnectionPool {
for shard_idx in shard_ids { for shard_idx in shard_ids {
let shard = &config.shards[&shard_idx]; let shard = &config.shards[&shard_idx];
let mut pools = Vec::new(); let mut pools = Vec::new();
let mut replica_addresses = Vec::new(); let mut servers = Vec::new();
let mut replica_number = 0;
for server in shard.servers.iter() { for server in shard.servers.iter() {
let role = match server.2.as_ref() { let role = match server.2.as_ref() {
@@ -61,12 +64,20 @@ impl ConnectionPool {
}; };
let address = Address { let address = Address {
id: address_id,
host: server.0.clone(), host: server.0.clone(),
port: server.1.to_string(), port: server.1.to_string(),
role: role, role: role,
replica_number,
shard: shard_idx.parse::<usize>().unwrap(), shard: shard_idx.parse::<usize>().unwrap(),
}; };
address_id += 1;
if role == Role::Replica {
replica_number += 1;
}
let manager = ServerPool::new( let manager = ServerPool::new(
address.clone(), address.clone(),
config.user.clone(), config.user.clone(),
@@ -86,11 +97,11 @@ impl ConnectionPool {
.unwrap(); .unwrap();
pools.push(pool); pools.push(pool);
replica_addresses.push(address); servers.push(address);
} }
shards.push(pools); shards.push(pools);
addresses.push(replica_addresses); addresses.push(servers);
banlist.push(HashMap::new()); banlist.push(HashMap::new());
} }
@@ -101,7 +112,7 @@ impl ConnectionPool {
databases: shards, databases: shards,
addresses: addresses, addresses: addresses,
round_robin: rand::random::<usize>() % address_len, // Start at a random replica round_robin: rand::random::<usize>() % address_len, // Start at a random replica
banlist: Arc::new(Mutex::new(banlist)), banlist: Arc::new(RwLock::new(banlist)),
stats: stats, stats: stats,
} }
} }
@@ -114,9 +125,13 @@ impl ConnectionPool {
pub async fn validate(&mut self) -> Result<BytesMut, Error> { pub async fn validate(&mut self) -> Result<BytesMut, Error> {
let mut server_infos = Vec::new(); let mut server_infos = Vec::new();
let stats = self.stats.clone();
for shard in 0..self.shards() { for shard in 0..self.shards() {
for _ in 0..self.servers(shard) { for _ in 0..self.servers(shard) {
let connection = match self.get(shard, None).await { // To keep stats consistent.
let fake_process_id = 0;
let connection = match self.get(shard, None, fake_process_id).await {
Ok(conn) => conn, Ok(conn) => conn,
Err(err) => { Err(err) => {
error!("Shard {} down or misconfigured: {:?}", shard, err); error!("Shard {} down or misconfigured: {:?}", shard, err);
@@ -125,10 +140,24 @@ impl ConnectionPool {
}; };
let mut proxy = connection.0; let mut proxy = connection.0;
let _address = connection.1; let address = connection.1;
let server = &mut *proxy; let server = &mut *proxy;
server_infos.push(server.server_info()); let server_info = server.server_info();
stats.client_disconnecting(fake_process_id, address.id);
if server_infos.len() > 0 {
// Compare against the last server checked.
if server_info != server_infos[server_infos.len() - 1] {
warn!(
"{:?} has different server configuration than the last server",
address
);
}
}
server_infos.push(server_info);
} }
} }
@@ -146,6 +175,7 @@ impl ConnectionPool {
&mut self, &mut self,
shard: usize, shard: usize,
role: Option<Role>, role: Option<Role>,
process_id: i32,
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let now = Instant::now(); let now = Instant::now();
let addresses = &self.addresses[shard]; let addresses = &self.addresses[shard];
@@ -161,6 +191,8 @@ impl ConnectionPool {
_ => addresses.len(), _ => addresses.len(),
}; };
debug!("Allowed attempts for {:?}: {}", role, allowed_attempts);
let exists = match role { let exists = match role {
Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0, Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0,
None => true, None => true,
@@ -179,6 +211,8 @@ impl ConnectionPool {
let index = self.round_robin % addresses.len(); let index = self.round_robin % addresses.len();
let address = &addresses[index]; let address = &addresses[index];
self.stats.client_waiting(process_id, address.id);
// Make sure you're getting a primary or a replica // Make sure you're getting a primary or a replica
// as per request. If no specific role is requested, the first // as per request. If no specific role is requested, the first
// available will be chosen. // available will be chosen.
@@ -198,6 +232,9 @@ impl ConnectionPool {
Err(err) => { Err(err) => {
error!("Banning replica {}, error: {:?}", index, err); error!("Banning replica {}, error: {:?}", index, err);
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}; };
@@ -206,7 +243,7 @@ impl ConnectionPool {
let server = &mut *conn; let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout; let healthcheck_timeout = get_config().general.healthcheck_timeout;
self.stats.server_tested(server.process_id()); self.stats.server_tested(server.process_id(), address.id);
match tokio::time::timeout( match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout), tokio::time::Duration::from_millis(healthcheck_timeout),
@@ -217,8 +254,9 @@ impl ConnectionPool {
// Check if health check succeeded // Check if health check succeeded
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats.checkout_time(now.elapsed().as_micros()); self.stats
self.stats.server_idle(conn.process_id()); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
Err(_) => { Err(_) => {
@@ -227,6 +265,9 @@ impl ConnectionPool {
server.mark_bad(); server.mark_bad();
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
}, },
@@ -237,6 +278,9 @@ impl ConnectionPool {
server.mark_bad(); server.mark_bad();
self.ban(address, shard); self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
continue; continue;
} }
} }
@@ -251,14 +295,14 @@ impl ConnectionPool {
pub fn ban(&self, address: &Address, shard: usize) { pub fn ban(&self, address: &Address, shard: usize) {
error!("Banning {:?}", address); error!("Banning {:?}", address);
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.write();
guard[shard].insert(address.clone(), now); guard[shard].insert(address.clone(), now);
} }
/// Clear the replica to receive traffic again. Takes effect immediately /// Clear the replica to receive traffic again. Takes effect immediately
/// for all new transactions. /// for all new transactions.
pub fn _unban(&self, address: &Address, shard: usize) { pub fn _unban(&self, address: &Address, shard: usize) {
let mut guard = self.banlist.lock().unwrap(); let mut guard = self.banlist.write();
guard[shard].remove(address); guard[shard].remove(address);
} }
@@ -274,12 +318,14 @@ impl ConnectionPool {
Some(Role::Primary) => return false, // Primary cannot be banned. Some(Role::Primary) => return false, // Primary cannot be banned.
}; };
// If you're not asking for the primary, debug!("Available targets for {:?}: {}", role, replicas_available);
// all databases are treated as replicas.
let mut guard = self.banlist.lock().unwrap(); let guard = self.banlist.read();
// Everything is banned = nothing is banned. // Everything is banned = nothing is banned.
if guard[shard].len() == replicas_available { if guard[shard].len() == replicas_available {
drop(guard);
let mut guard = self.banlist.write();
guard[shard].clear(); guard[shard].clear();
drop(guard); drop(guard);
warn!("Unbanning all replicas."); warn!("Unbanning all replicas.");
@@ -291,16 +337,24 @@ impl ConnectionPool {
Some(timestamp) => { Some(timestamp) => {
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();
let config = get_config(); let config = get_config();
// Ban expired. // Ban expired.
if now.timestamp() - timestamp.timestamp() > config.general.ban_time { if now.timestamp() - timestamp.timestamp() > config.general.ban_time {
drop(guard);
warn!("Unbanning {:?}", address);
let mut guard = self.banlist.write();
guard[shard].remove(address); guard[shard].remove(address);
false false
} else { } else {
debug!("{:?} is banned", address);
true true
} }
} }
None => false, None => {
debug!("{:?} is ok", address);
false
}
} }
} }
@@ -311,6 +365,22 @@ impl ConnectionPool {
pub fn servers(&self, shard: usize) -> usize { pub fn servers(&self, shard: usize) -> usize {
self.addresses[shard].len() self.addresses[shard].len()
} }
pub fn databases(&self) -> usize {
let mut databases = 0;
for shard in 0..self.shards() {
databases += self.servers(shard);
}
databases
}
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
self.databases[shard][server].state()
}
pub fn address(&self, shard: usize, server: usize) -> &Address {
&self.addresses[shard][server]
}
} }
pub struct ServerPool { pub struct ServerPool {
@@ -348,13 +418,14 @@ impl ManageConnection for ServerPool {
async fn connect(&self) -> Result<Self::Connection, Self::Error> { async fn connect(&self) -> Result<Self::Connection, Self::Error> {
info!( info!(
"Creating a new connection to {:?} using user {:?}", "Creating a new connection to {:?} using user {:?}",
self.address, self.user.name self.address.name(),
self.user.name
); );
// Put a temporary process_id into the stats // Put a temporary process_id into the stats
// for server login. // for server login.
let process_id = rand::random::<i32>(); let process_id = rand::random::<i32>();
self.stats.server_login(process_id); self.stats.server_login(process_id, self.address.id);
match Server::startup( match Server::startup(
&self.address, &self.address,
@@ -367,12 +438,12 @@ impl ManageConnection for ServerPool {
{ {
Ok(conn) => { Ok(conn) => {
// Remove the temporary process_id from the stats. // Remove the temporary process_id from the stats.
self.stats.server_disconnecting(process_id); self.stats.server_disconnecting(process_id, self.address.id);
Ok(conn) Ok(conn)
} }
Err(err) => { Err(err) => {
// Remove the temporary process_id from the stats. // Remove the temporary process_id from the stats.
self.stats.server_disconnecting(process_id); self.stats.server_disconnecting(process_id, self.address.id);
Err(err) Err(err)
} }
} }

View File

@@ -1,21 +1,21 @@
use crate::config::{get_config, Role}; use crate::config::{get_config, Role};
use crate::sharding::Sharder; use crate::sharding::{Sharder, ShardingFunction};
/// Route queries automatically based on explicitely requested /// Route queries automatically based on explicitely requested
/// or implied query characteristics. /// or implied query characteristics.
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use log::{debug, error}; use log::{debug, error};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use regex::RegexSet; use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Query, StartTransaction}; use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::dialect::PostgreSqlDialect; use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
const CUSTOM_SQL_REGEXES: [&str; 5] = [ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)SET SHARDING KEY TO '[0-9]+'", r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)SET SHARD TO '[0-9]+'", r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
r"(?i)SHOW SHARD", r"(?i)^ *SHOW SHARD *;? *$",
r"(?i)SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)'", r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
r"(?i)SHOW SERVER ROLE", r"(?i)^ *SHOW SERVER ROLE *;? *$",
]; ];
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
@@ -27,8 +27,12 @@ pub enum Command {
ShowServerRole, ShowServerRole,
} }
// Quick test
static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new(); static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Capture value
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
pub struct QueryRouter { pub struct QueryRouter {
// By default, queries go here, unless we have better information // By default, queries go here, unless we have better information
// about what the client wants. // about what the client wants.
@@ -48,6 +52,9 @@ pub struct QueryRouter {
// Should we try to parse queries? // Should we try to parse queries?
query_parser_enabled: bool, query_parser_enabled: bool,
// Which sharding function are we using?
sharding_function: ShardingFunction,
} }
impl QueryRouter { impl QueryRouter {
@@ -60,6 +67,21 @@ impl QueryRouter {
} }
}; };
let list: Vec<_> = CUSTOM_SQL_REGEXES
.iter()
.map(|rgx| Regex::new(rgx).unwrap())
.collect();
// Impossible
if list.len() != set.len() {
return false;
}
match CUSTOM_SQL_REGEX_LIST.set(list) {
Ok(_) => true,
Err(_) => return false,
};
match CUSTOM_SQL_REGEX_SET.set(set) { match CUSTOM_SQL_REGEX_SET.set(set) {
Ok(_) => true, Ok(_) => true,
Err(_) => false, Err(_) => false,
@@ -76,6 +98,12 @@ impl QueryRouter {
_ => unreachable!(), _ => unreachable!(),
}; };
let sharding_function = match config.query_router.sharding_function.as_ref() {
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
"sha1" => ShardingFunction::Sha1,
_ => unreachable!(),
};
QueryRouter { QueryRouter {
default_server_role: default_server_role, default_server_role: default_server_role,
shards: config.shards.len(), shards: config.shards.len(),
@@ -84,6 +112,7 @@ impl QueryRouter {
active_shard: None, active_shard: None,
primary_reads_enabled: config.query_router.primary_reads_enabled, primary_reads_enabled: config.query_router.primary_reads_enabled,
query_parser_enabled: config.query_router.query_parser_enabled, query_parser_enabled: config.query_router.query_parser_enabled,
sharding_function,
} }
} }
@@ -103,6 +132,11 @@ impl QueryRouter {
None => return None, None => return None,
}; };
let regex_list = match CUSTOM_SQL_REGEX_LIST.get() {
Some(regex_list) => regex_list,
None => return None,
};
let matches: Vec<_> = regex_set.matches(&query).into_iter().collect(); let matches: Vec<_> = regex_set.matches(&query).into_iter().collect();
if matches.len() != 1 { if matches.len() != 1 {
@@ -120,7 +154,19 @@ impl QueryRouter {
let mut value = match command { let mut value = match command {
Command::SetShardingKey | Command::SetShard | Command::SetServerRole => { Command::SetShardingKey | Command::SetShard | Command::SetServerRole => {
query.split("'").collect::<Vec<&str>>()[1].to_string() // Capture value. I know this re-runs the regex engine, but I haven't
// figured out a better way just yet. I think I can write a single Regex
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
//
// I think this is faster than running the Regex engine 5 times, so
// this is a strong maybe for me so far.
match regex_list[matches[0]].captures(&query) {
Some(captures) => match captures.get(1) {
Some(value) => value.as_str().to_string(),
None => return None,
},
None => return None,
}
} }
Command::ShowShard => self.shard().to_string(), Command::ShowShard => self.shard().to_string(),
@@ -139,14 +185,17 @@ impl QueryRouter {
match command { match command {
Command::SetShardingKey => { Command::SetShardingKey => {
let sharder = Sharder::new(self.shards); let sharder = Sharder::new(self.shards, self.sharding_function);
let shard = sharder.pg_bigint_hash(value.parse::<i64>().unwrap()); let shard = sharder.shard(value.parse::<i64>().unwrap());
self.active_shard = Some(shard); self.active_shard = Some(shard);
value = shard.to_string(); value = shard.to_string();
} }
Command::SetShard => { Command::SetShard => {
self.active_shard = Some(value.parse::<usize>().unwrap()); self.active_shard = match value.to_ascii_uppercase().as_ref() {
"ANY" => Some(rand::random::<usize>() % self.shards),
_ => Some(value.parse::<usize>().unwrap()),
};
} }
Command::SetServerRole => { Command::SetServerRole => {
@@ -194,7 +243,12 @@ impl QueryRouter {
let len = buf.get_i32() as usize; let len = buf.get_i32() as usize;
let query = match code { let query = match code {
'Q' => String::from_utf8_lossy(&buf[..len - 5]).to_string(), 'Q' => {
let query = String::from_utf8_lossy(&buf[..len - 5]).to_string();
debug!("Query: '{}'", query);
query
}
'P' => { 'P' => {
let mut start = 0; let mut start = 0;
let mut end; let mut end;
@@ -213,6 +267,8 @@ impl QueryRouter {
let query = String::from_utf8_lossy(&buf[start..end]).to_string(); let query = String::from_utf8_lossy(&buf[start..end]).to_string();
debug!("Prepared statement: '{}'", query);
query.replace("$", "") // Remove placeholders turning them into "values" query.replace("$", "") // Remove placeholders turning them into "values"
} }
_ => return false, _ => return false,
@@ -221,7 +277,7 @@ impl QueryRouter {
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast, Ok(ast) => ast,
Err(err) => { Err(err) => {
debug!("{:?}, query: {}", err, query); debug!("{}", err.to_string());
return false; return false;
} }
}; };
@@ -394,14 +450,38 @@ mod test {
"set server role to 'any'", "set server role to 'any'",
"set server role to 'auto'", "set server role to 'auto'",
"show server role", "show server role",
// No quotes
"SET SHARDING KEY TO 11235",
"SET SHARD TO 15",
// Spaces and semicolon
" SET SHARDING KEY TO 11235 ; ",
" SET SHARD TO 15; ",
" SET SHARDING KEY TO 11235 ;",
" SET SERVER ROLE TO 'primary'; ",
" SET SERVER ROLE TO 'primary' ; ",
" SET SERVER ROLE TO 'primary' ;",
]; ];
// Which regexes it'll match to in the list
let matches = [
0, 1, 2, 3, 3, 3, 3, 4, 0, 1, 2, 3, 3, 3, 3, 4, 0, 1, 0, 1, 0, 3, 3, 3,
];
let list = CUSTOM_SQL_REGEX_LIST.get().unwrap();
let set = CUSTOM_SQL_REGEX_SET.get().unwrap(); let set = CUSTOM_SQL_REGEX_SET.get().unwrap();
for test in &tests { for (i, test) in tests.iter().enumerate() {
let matches: Vec<_> = set.matches(test).into_iter().collect(); assert!(list[matches[i]].is_match(test));
assert_eq!(set.matches(test).into_iter().collect::<Vec<_>>().len(), 1);
}
assert_eq!(matches.len(), 1); let bad = [
"SELECT * FROM table",
"SELECT * FROM table WHERE value = 'set sharding key to 5'", // Don't capture things in the middle of the query
];
for query in &bad {
assert_eq!(set.matches(query).into_iter().collect::<Vec<_>>().len(), 0);
} }
} }
@@ -411,7 +491,7 @@ mod test {
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
// SetShardingKey // SetShardingKey
let query = simple_query("SET SHARDING KEY TO '13'"); let query = simple_query("SET SHARDING KEY TO 13");
assert_eq!( assert_eq!(
qr.try_execute_command(query), qr.try_execute_command(query),
Some((Command::SetShardingKey, String::from("1"))) Some((Command::SetShardingKey, String::from("1")))

View File

@@ -1,7 +1,7 @@
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
///! Implementation of the PostgreSQL server (database) protocol. ///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client. ///! Here we are pretending to the a Postgres client.
use log::{error, info}; use log::{debug, error, info, trace};
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -75,6 +75,8 @@ impl Server {
} }
}; };
trace!("Sending StartupMessage");
// Send the startup packet telling the server we're a normal Postgres client. // Send the startup packet telling the server we're a normal Postgres client.
startup(&mut stream, &user.name, database).await?; startup(&mut stream, &user.name, database).await?;
@@ -95,6 +97,8 @@ impl Server {
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
}; };
trace!("Message: {}", code);
match code { match code {
// Authentication // Authentication
'R' => { 'R' => {
@@ -104,6 +108,8 @@ impl Server {
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
}; };
trace!("Auth: {}", auth_code);
match auth_code { match auth_code {
MD5_ENCRYPTED_PASSWORD => { MD5_ENCRYPTED_PASSWORD => {
// The salt is 4 bytes. // The salt is 4 bytes.
@@ -135,6 +141,8 @@ impl Server {
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
}; };
trace!("Error: {}", error_code);
match error_code { match error_code {
// No error message is present in the message. // No error message is present in the message.
MESSAGE_TERMINATOR => (), MESSAGE_TERMINATOR => (),
@@ -247,6 +255,8 @@ impl Server {
} }
}; };
debug!("Sending CancelRequest");
let mut bytes = BytesMut::with_capacity(16); let mut bytes = BytesMut::with_capacity(16);
bytes.put_i32(16); bytes.put_i32(16);
bytes.put_i32(CANCEL_REQUEST_CODE); bytes.put_i32(CANCEL_REQUEST_CODE);
@@ -258,7 +268,8 @@ impl Server {
/// Send messages to the server from the client. /// Send messages to the server from the client.
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
self.stats.data_sent(messages.len()); self.stats
.data_sent(messages.len(), self.process_id, self.address.id);
match write_all_half(&mut self.write, messages).await { match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
@@ -290,6 +301,8 @@ impl Server {
let code = message.get_u8() as char; let code = message.get_u8() as char;
let _len = message.get_i32(); let _len = message.get_i32();
trace!("Message: {}", code);
match code { match code {
// ReadyForQuery // ReadyForQuery
'Z' => { 'Z' => {
@@ -362,7 +375,8 @@ impl Server {
let bytes = self.buffer.clone(); let bytes = self.buffer.clone();
// Keep track of how much data we got from the server for stats. // Keep track of how much data we got from the server for stats.
self.stats.data_received(bytes.len()); self.stats
.data_received(bytes.len(), self.process_id, self.address.id);
// Clear the buffer for next query. // Clear the buffer for next query.
self.buffer.clear(); self.buffer.clear();
@@ -403,7 +417,7 @@ impl Server {
/// Claim this server as mine for the purposes of query cancellation. /// Claim this server as mine for the purposes of query cancellation.
pub fn claim(&mut self, process_id: i32, secret_key: i32) { pub fn claim(&mut self, process_id: i32, secret_key: i32) {
let mut guard = self.client_server_map.lock().unwrap(); let mut guard = self.client_server_map.lock();
guard.insert( guard.insert(
(process_id, secret_key), (process_id, secret_key),
( (
@@ -419,18 +433,9 @@ impl Server {
/// It will use the simple query protocol. /// It will use the simple query protocol.
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`. /// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
pub async fn query(&mut self, query: &str) -> Result<(), Error> { pub async fn query(&mut self, query: &str) -> Result<(), Error> {
let mut query = BytesMut::from(&query.as_bytes()[..]); let query = simple_query(query);
query.put_u8(0); // C-string terminator (NULL character).
let len = query.len() as i32 + 4; self.send(query).await?;
let mut msg = BytesMut::with_capacity(len as usize + 1);
msg.put_u8(b'Q');
msg.put_i32(len);
msg.put_slice(&query[..]);
self.send(msg).await?;
loop { loop {
let _ = self.recv().await?; let _ = self.recv().await?;
@@ -467,7 +472,8 @@ impl Drop for Server {
/// the socket is in non-blocking mode, so it may not be ready /// the socket is in non-blocking mode, so it may not be ready
/// for a write. /// for a write.
fn drop(&mut self) { fn drop(&mut self) {
self.stats.server_disconnecting(self.process_id()); self.stats
.server_disconnecting(self.process_id(), self.address.id);
let mut bytes = BytesMut::with_capacity(4); let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X'); bytes.put_u8(b'X');

View File

@@ -1,26 +1,62 @@
use sha1::{Digest, Sha1};
// https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20 // https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD; const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum ShardingFunction {
PgBigintHash,
Sha1,
}
pub struct Sharder { pub struct Sharder {
shards: usize, shards: usize,
sharding_function: ShardingFunction,
} }
impl Sharder { impl Sharder {
pub fn new(shards: usize) -> Sharder { pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
Sharder { shards: shards } Sharder {
shards,
sharding_function,
}
}
pub fn shard(&self, key: i64) -> usize {
match self.sharding_function {
ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
ShardingFunction::Sha1 => self.sha1(key),
}
} }
/// Hash function used by Postgres to determine which partition /// Hash function used by Postgres to determine which partition
/// to put the row in when using HASH(column) partitioning. /// to put the row in when using HASH(column) partitioning.
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631 /// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631
/// Supports only 1 bigint at the moment, but we can add more later. /// Supports only 1 bigint at the moment, but we can add more later.
pub fn pg_bigint_hash(&self, key: i64) -> usize { fn pg_bigint_hash(&self, key: i64) -> usize {
let mut lohalf = key as u32; let mut lohalf = key as u32;
let hihalf = (key >> 32) as u32; let hihalf = (key >> 32) as u32;
lohalf ^= if key >= 0 { hihalf } else { !hihalf }; lohalf ^= if key >= 0 { hihalf } else { !hihalf };
Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards
} }
/// Example of a hashing function based on SHA1.
fn sha1(&self, key: i64) -> usize {
let mut hasher = Sha1::new();
hasher.update(&key.to_string().as_bytes());
let result = hasher.finalize();
// Convert the SHA1 hash into hex so we can parse it as a large integer.
let hex = format!("{:x}", result);
// Parse the last 8 bytes as an integer (8 bytes = bigint).
let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize;
key % self.shards
}
#[inline] #[inline]
fn rot(x: u32, k: u32) -> u32 { fn rot(x: u32, k: u32) -> u32 {
(x << k) | (x >> (32 - k)) (x << k) | (x >> (32 - k))
@@ -109,36 +145,51 @@ mod test {
// confirming that we implemented Postgres BIGINT hashing correctly. // confirming that we implemented Postgres BIGINT hashing correctly.
#[test] #[test]
fn test_pg_bigint_hash() { fn test_pg_bigint_hash() {
let sharder = Sharder::new(5); let sharder = Sharder::new(5, ShardingFunction::PgBigintHash);
let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53]; let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53];
for v in shard_0 { for v in shard_0 {
assert_eq!(sharder.pg_bigint_hash(v), 0); assert_eq!(sharder.shard(v), 0);
} }
let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54]; let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54];
for v in shard_1 { for v in shard_1 {
assert_eq!(sharder.pg_bigint_hash(v), 1); assert_eq!(sharder.shard(v), 1);
} }
let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35]; let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35];
for v in shard_2 { for v in shard_2 {
assert_eq!(sharder.pg_bigint_hash(v), 2); assert_eq!(sharder.shard(v), 2);
} }
let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43]; let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43];
for v in shard_3 { for v in shard_3 {
assert_eq!(sharder.pg_bigint_hash(v), 3); assert_eq!(sharder.shard(v), 3);
} }
let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45]; let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45];
for v in shard_4 { for v in shard_4 {
assert_eq!(sharder.pg_bigint_hash(v), 4); assert_eq!(sharder.shard(v), 4);
}
}
#[test]
fn test_sha1_hash() {
let sharder = Sharder::new(12, ShardingFunction::Sha1);
let ids = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let shards = vec![
4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
];
for (i, id) in ids.iter().enumerate() {
assert_eq!(sharder.shard(*id), shards[i]);
} }
} }
} }

View File

@@ -1,13 +1,20 @@
/// Statistics and reporting.
use log::info; use log::info;
use statsd::Client; use once_cell::sync::Lazy;
/// Events collector and publisher. use parking_lot::Mutex;
use std::collections::HashMap;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap; // Latest stats updated every second; used in SHOW STATS and other admin commands.
use std::time::Instant; static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
use crate::config::get_config; // Statistics period used for average calculations.
// 15 seconds.
static STAT_PERIOD: u64 = 15000;
/// The names for the events reported
/// to the statistics collector.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum EventName { enum EventName {
CheckoutTime, CheckoutTime,
@@ -24,188 +31,266 @@ enum EventName {
ServerTested, ServerTested,
ServerLogin, ServerLogin,
ServerDisconnecting, ServerDisconnecting,
UpdateStats,
UpdateAverages,
} }
/// Event data sent to the collector
/// from clients and servers.
#[derive(Debug)] #[derive(Debug)]
pub struct Event { pub struct Event {
/// The name of the event being reported.
name: EventName, name: EventName,
/// The value being reported. Meaning differs based on event name.
value: i64, value: i64,
process_id: Option<i32>,
/// The client or server connection reporting the event.
process_id: i32,
/// The server the client is connected to.
address_id: usize,
} }
/// The statistics reporter. An instance is given
/// to each possible source of statistics,
/// e.g. clients, servers, connection pool.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Reporter { pub struct Reporter {
tx: Sender<Event>, tx: Sender<Event>,
} }
impl Reporter { impl Reporter {
/// Create a new Reporter instance.
pub fn new(tx: Sender<Event>) -> Reporter { pub fn new(tx: Sender<Event>) -> Reporter {
Reporter { tx: tx } Reporter { tx: tx }
} }
pub fn query(&self) { /// Report a query executed by a client against
let statistic = Event { /// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::Query, name: EventName::Query,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn transaction(&self) { /// Report a transaction executed by a client against
let statistic = Event { /// a server identified by the `address_id`.
pub fn transaction(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::Transaction, name: EventName::Transaction,
value: 1, value: 1,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn data_sent(&self, amount: usize) { /// Report data sent to a server identified by `address_id`.
let statistic = Event { /// The `amount` is measured in bytes.
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::DataSent, name: EventName::DataSent,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn data_received(&self, amount: usize) { /// Report data received from a server identified by `address_id`.
let statistic = Event { /// The `amount` is measured in bytes.
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::DataReceived, name: EventName::DataReceived,
value: amount as i64, value: amount as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn checkout_time(&self, ms: u128) { /// Time spent waiting to get a healthy connection from the pool
let statistic = Event { /// for a server identified by `address_id`.
/// Measured in milliseconds.
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::CheckoutTime, name: EventName::CheckoutTime,
value: ms as i64, value: ms as i64,
process_id: None, process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn client_waiting(&self, process_id: i32) { /// Reports a client identified by `process_id` waiting for a connection
let statistic = Event { /// to a server identified by `address_id`.
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientWaiting, name: EventName::ClientWaiting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn client_active(&self, process_id: i32) { /// Reports a client identified by `process_id` is done waiting for a connection
let statistic = Event { /// to a server identified by `address_id` and is about to query the server.
pub fn client_active(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientActive, name: EventName::ClientActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn client_idle(&self, process_id: i32) { /// Reports a client identified by `process_id` is done querying the server
let statistic = Event { /// identified by `address_id` and is no longer active.
pub fn client_idle(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientIdle, name: EventName::ClientIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn client_disconnecting(&self, process_id: i32) { /// Reports a client identified by `process_id` is disconecting from the pooler.
let statistic = Event { /// The last server it was connected to is identified by `address_id`.
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ClientDisconnecting, name: EventName::ClientDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn server_active(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
let statistic = Event { /// a configured server identified by `address_id` is actively used
/// by a client.
pub fn server_active(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerActive, name: EventName::ServerActive,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn server_idle(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
let statistic = Event { /// a configured server identified by `address_id` is no longer
/// actively used by a client and is now idle.
pub fn server_idle(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerIdle, name: EventName::ServerIdle,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn server_login(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
let statistic = Event { /// a configured server identified by `address_id` is attempting
/// to login.
pub fn server_login(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerLogin, name: EventName::ServerLogin,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn server_tested(&self, process_id: i32) { /// Reports a server connection identified by `process_id` for
let statistic = Event { /// a configured server identified by `address_id` is being
/// tested before being given to a client.
pub fn server_tested(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerTested, name: EventName::ServerTested,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
pub fn server_disconnecting(&self, process_id: i32) { /// Reports a server connection identified by `process_id` is disconecting from the pooler.
let statistic = Event { /// The configured server it was connected to is identified by `address_id`.
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
let event = Event {
name: EventName::ServerDisconnecting, name: EventName::ServerDisconnecting,
value: 1, value: 1,
process_id: Some(process_id), process_id: process_id,
address_id: address_id,
}; };
let _ = self.tx.try_send(statistic); let _ = self.tx.try_send(event);
} }
} }
/// The statistics collector which is receiving statistics
/// from clients, servers, and the connection pool. There is
/// only one collector (kind of like a singleton).
/// The collector can trigger events on its own, e.g.
/// it updates aggregates every second and averages every
/// 15 seconds.
pub struct Collector { pub struct Collector {
rx: Receiver<Event>, rx: Receiver<Event>,
client: Client, tx: Sender<Event>,
} }
impl Collector { impl Collector {
pub fn new(rx: Receiver<Event>) -> Collector { /// Create a new collector instance. There should only be one instance
Collector { /// at a time. This is ensured by mpsc which allows only one receiver.
rx: rx, pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(), Collector { rx, tx }
}
} }
pub async fn collect(&mut self) { /// The statistics collection handler. It will collect statistics
/// for `address_id`s starting at 0 up to `addresses`.
pub async fn collect(&mut self, addresses: usize) {
info!("Events reporter started"); info!("Events reporter started");
let mut stats = HashMap::from([ let stats_template = HashMap::from([
("total_query_count", 0), ("total_query_count", 0),
("total_xact_count", 0), ("total_xact_count", 0),
("total_sent", 0), ("total_sent", 0),
("total_received", 0), ("total_received", 0),
("total_xact_time", 0),
("total_query_time", 0),
("total_wait_time", 0), ("total_wait_time", 0),
("avg_xact_time", 0),
("avg_query_time", 0),
("avg_xact_count", 0),
("avg_sent", 0),
("avg_received", 0),
("avg_wait_time", 0),
("maxwait_us", 0), ("maxwait_us", 0),
("maxwait", 0), ("maxwait", 0),
("cl_waiting", 0), ("cl_waiting", 0),
@@ -217,10 +302,51 @@ impl Collector {
("sv_tested", 0), ("sv_tested", 0),
]); ]);
let mut client_server_states: HashMap<i32, EventName> = HashMap::new(); let mut stats = HashMap::new();
let mut now = Instant::now(); // Stats saved after each iteration of the flush event. Used in calculation
// of averages in the last flush period.
let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
// Track which state the client and server are at any given time.
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
// Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
loop {
interval.tick().await;
for address_id in 0..addresses {
let _ = tx.try_send(Event {
name: EventName::UpdateStats,
value: 0,
process_id: -1,
address_id: address_id,
});
}
}
});
let tx = self.tx.clone();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
loop {
interval.tick().await;
for address_id in 0..addresses {
let _ = tx.try_send(Event {
name: EventName::UpdateAverages,
value: 0,
process_id: -1,
address_id: address_id,
});
}
}
});
// The collector loop
loop { loop {
let stat = match self.rx.recv().await { let stat = match self.rx.recv().await {
Some(stat) => stat, Some(stat) => stat,
@@ -230,6 +356,14 @@ impl Collector {
} }
}; };
let stats = stats
.entry(stat.address_id)
.or_insert(stats_template.clone());
let client_server_states = client_server_states
.entry(stat.address_id)
.or_insert(HashMap::new());
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
// Some are counters, some are gauges... // Some are counters, some are gauges...
match stat.name { match stat.name {
EventName::Query => { EventName::Query => {
@@ -257,10 +391,11 @@ impl Collector {
*counter += stat.value; *counter += stat.value;
let counter = stats.entry("maxwait_us").or_insert(0); let counter = stats.entry("maxwait_us").or_insert(0);
let mic_part = stat.value % 1_000_000;
// Report max time here // Report max time here
if stat.value > *counter { if mic_part > *counter {
*counter = stat.value; *counter = mic_part;
} }
let counter = stats.entry("maxwait").or_insert(0); let counter = stats.entry("maxwait").or_insert(0);
@@ -278,71 +413,105 @@ impl Collector {
| EventName::ServerIdle | EventName::ServerIdle
| EventName::ServerTested | EventName::ServerTested
| EventName::ServerLogin => { | EventName::ServerLogin => {
client_server_states.insert(stat.process_id.unwrap(), stat.name); client_server_states.insert(stat.process_id, stat.name);
} }
EventName::ClientDisconnecting | EventName::ServerDisconnecting => { EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
client_server_states.remove(&stat.process_id.unwrap()); client_server_states.remove(&stat.process_id);
}
EventName::UpdateStats => {
// Calculate connection states
for (_, state) in client_server_states.iter() {
match state {
EventName::ClientActive => {
let counter = stats.entry("cl_active").or_insert(0);
*counter += 1;
}
EventName::ClientWaiting => {
let counter = stats.entry("cl_waiting").or_insert(0);
*counter += 1;
}
EventName::ServerIdle => {
let counter = stats.entry("sv_idle").or_insert(0);
*counter += 1;
}
EventName::ServerActive => {
let counter = stats.entry("sv_active").or_insert(0);
*counter += 1;
}
EventName::ServerTested => {
let counter = stats.entry("sv_tested").or_insert(0);
*counter += 1;
}
EventName::ServerLogin => {
let counter = stats.entry("sv_login").or_insert(0);
*counter += 1;
}
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
_ => unreachable!(),
};
}
// Update latest stats used in SHOW STATS
let mut guard = LATEST_STATS.lock();
for (key, value) in stats.iter() {
let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
entry.insert(key.to_string(), value.clone());
}
// These are re-calculated every iteration of the loop, so we don't want to add values
// from the last iteration.
for stat in &[
"cl_active",
"cl_waiting",
"cl_idle",
"sv_idle",
"sv_active",
"sv_tested",
"sv_login",
"maxwait",
"maxwait_us",
] {
stats.insert(stat, 0);
}
}
EventName::UpdateAverages => {
// Calculate averages
for stat in &[
"avg_query_count",
"avgxact_count",
"avg_sent",
"avg_received",
"avg_wait_time",
] {
let total_name = stat.replace("avg_", "total_");
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;
}
} }
}; };
// It's been 15 seconds. If there is no traffic, it won't publish anything,
// but it also doesn't matter then.
if now.elapsed().as_secs() > 15 {
for (_, state) in &client_server_states {
match state {
EventName::ClientActive => {
let counter = stats.entry("cl_active").or_insert(0);
*counter += 1;
}
EventName::ClientWaiting => {
let counter = stats.entry("cl_waiting").or_insert(0);
*counter += 1;
}
EventName::ClientIdle => {
let counter = stats.entry("cl_idle").or_insert(0);
*counter += 1;
}
EventName::ServerIdle => {
let counter = stats.entry("sv_idle").or_insert(0);
*counter += 1;
}
EventName::ServerActive => {
let counter = stats.entry("sv_active").or_insert(0);
*counter += 1;
}
EventName::ServerTested => {
let counter = stats.entry("sv_tested").or_insert(0);
*counter += 1;
}
EventName::ServerLogin => {
let counter = stats.entry("sv_login").or_insert(0);
*counter += 1;
}
_ => unreachable!(),
};
}
info!("{:?}", stats);
let mut pipeline = self.client.pipeline();
for (key, value) in stats.iter_mut() {
pipeline.gauge(key, *value as f64);
*value = 0;
}
pipeline.send(&self.client);
now = Instant::now();
}
} }
} }
} }
/// Get a snapshot of statistics. Updated once a second
/// by the `Collector`.
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
LATEST_STATS.lock().clone()
}

28
tests/pgbench/simple.sql Normal file
View File

@@ -0,0 +1,28 @@
-- \setrandom aid 1 :naccounts
\set aid random(1, 100000)
-- \setrandom bid 1 :nbranches
\set bid random(1, 100000)
-- \setrandom tid 1 :ntellers
\set tid random(1, 100000)
-- \setrandom delta -5000 5000
\set delta random(-5000,5000)
\set shard random(0, 2)
SET SHARD TO :shard;
BEGIN;
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
END;