mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
17 Commits
v0.1.0-alp
...
sven_md5_a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f745859c0 | ||
|
|
1e8fa110ae | ||
|
|
d4186b7815 | ||
|
|
aaeef69d59 | ||
|
|
b21e0f4a7e | ||
|
|
eb1473060e | ||
|
|
26f75f8d5d | ||
|
|
99d65fc475 | ||
|
|
206fdc9769 | ||
|
|
f74101cdfe | ||
|
|
8e0682482d | ||
|
|
6db51b4a11 | ||
|
|
a784883611 | ||
|
|
5972b6fa52 | ||
|
|
b3c8ca4b8a | ||
|
|
dce72ba262 | ||
|
|
af1716bcd7 |
108
.circleci/pgcat.toml
Normal file
108
.circleci/pgcat.toml
Normal file
@@ -0,0 +1,108 @@
|
||||
#
|
||||
# PgCat config example.
|
||||
#
|
||||
|
||||
#
|
||||
# General pooler settings
|
||||
[general]
|
||||
|
||||
# What IP to run on, 0.0.0.0 means accessible from everywhere.
|
||||
host = "0.0.0.0"
|
||||
|
||||
# Port to run on, same as PgBouncer used in this example.
|
||||
port = 6432
|
||||
|
||||
# How many connections to allocate per server.
|
||||
pool_size = 15
|
||||
|
||||
# Pool mode (see PgBouncer docs for more).
|
||||
# session: one server connection per connected client
|
||||
# transaction: one server connection per client transaction
|
||||
pool_mode = "transaction"
|
||||
|
||||
# How long to wait before aborting a server connection (ms).
|
||||
connect_timeout = 100
|
||||
|
||||
# How much time to give `SELECT 1` health check query to return with a result (ms).
|
||||
healthcheck_timeout = 100
|
||||
|
||||
# For how long to ban a server if it fails a health check (seconds).
|
||||
ban_time = 60 # Seconds
|
||||
|
||||
# Stats will be sent here
|
||||
statsd_address = "127.0.0.1:8125"
|
||||
|
||||
#
|
||||
# User to use for authentication against the server.
|
||||
[user]
|
||||
name = "sharding_user"
|
||||
password = "sharding_user"
|
||||
|
||||
|
||||
#
|
||||
# Shards in the cluster
|
||||
[shards]
|
||||
|
||||
# Shard 0
|
||||
[shards.0]
|
||||
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5433, "replica" ],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
# Database name (e.g. "postgres")
|
||||
database = "shard0"
|
||||
|
||||
[shards.1]
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5433, "replica" ],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
database = "shard1"
|
||||
|
||||
[shards.2]
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5433, "replica" ],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
database = "shard2"
|
||||
|
||||
|
||||
# Settings for our query routing layer.
|
||||
[query_router]
|
||||
|
||||
# If the client doesn't specify, route traffic to
|
||||
# this role by default.
|
||||
#
|
||||
# any: round-robin between primary and replicas,
|
||||
# replica: round-robin between replicas only without touching the primary,
|
||||
# primary: all queries go to the primary unless otherwise specified.
|
||||
default_role = "any"
|
||||
|
||||
|
||||
# Query parser. If enabled, we'll attempt to parse
|
||||
# every incoming query to determine if it's a read or a write.
|
||||
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
|
||||
# we'll direct it to the primary.
|
||||
query_parser_enabled = false
|
||||
|
||||
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# queries. The primary can always be explicitely selected with our custom protocol.
|
||||
primary_reads_enabled = true
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
# or you've already built one and you want this pooler to use it?
|
||||
#
|
||||
# Current options:
|
||||
#
|
||||
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
|
||||
# sha1: A hashing function based on SHA1
|
||||
#
|
||||
sharding_function = "pg_bigint_hash"
|
||||
@@ -3,20 +3,34 @@
|
||||
set -e
|
||||
set -o xtrace
|
||||
|
||||
# Start PgCat with a particular log level
|
||||
# for inspection.
|
||||
function start_pgcat() {
|
||||
kill -s SIGINT $(pgrep pgcat) || true
|
||||
RUST_LOG=${1} ./target/debug/pgcat .circleci/pgcat.toml &
|
||||
sleep 1
|
||||
}
|
||||
|
||||
# Setup the database with shards and user
|
||||
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
|
||||
./target/debug/pgcat &
|
||||
# Install Toxiproxy to simulate a downed/slow database
|
||||
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
||||
sudo dpkg -i toxiproxy-2.1.4.deb
|
||||
|
||||
# Start Toxiproxy
|
||||
toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
# Setup PgBench
|
||||
pgbench -i -h 127.0.0.1 -p 6432
|
||||
# Create a database at port 5433, forward it to Postgres
|
||||
toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
|
||||
|
||||
# Run it
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple
|
||||
start_pgcat "info"
|
||||
|
||||
# Extended protocol
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||
# pgbench test
|
||||
pgbench -i -h 127.0.0.1 -p 6432 && \
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \
|
||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||
|
||||
# COPY TO STDOUT test
|
||||
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
||||
@@ -35,18 +49,48 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /
|
||||
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
|
||||
|
||||
#
|
||||
# ActiveRecord tests!
|
||||
# ActiveRecord tests
|
||||
#
|
||||
cd tests/ruby
|
||||
sudo gem install bundler
|
||||
bundle install
|
||||
ruby tests.rb
|
||||
cd tests/ruby && \
|
||||
sudo gem install bundler && \
|
||||
bundle install && \
|
||||
ruby tests.rb && \
|
||||
cd ../..
|
||||
|
||||
# Admin tests
|
||||
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||
|
||||
# Start PgCat in debug to demonstrate failover better
|
||||
start_pgcat "debug"
|
||||
|
||||
# Add latency to the replica at port 5433 slightly above the healthcheck timeout
|
||||
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica
|
||||
sleep 1
|
||||
|
||||
# Note the failover in the logs
|
||||
timeout 5 psql -e -h 127.0.0.1 -p 6432 <<-EOF
|
||||
SELECT 1;
|
||||
SELECT 1;
|
||||
SELECT 1;
|
||||
EOF
|
||||
|
||||
# Remove latency
|
||||
toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
cd ../../
|
||||
# Test session mode (and config reload)
|
||||
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml
|
||||
|
||||
# Reload config
|
||||
# Reload config test
|
||||
kill -SIGHUP $(pgrep pgcat)
|
||||
|
||||
# Prepared statements that will only work in session mode
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,5 @@
|
||||
/target
|
||||
*.deb
|
||||
.idea/*
|
||||
tests/ruby/.bundle/*
|
||||
tests/ruby/vendor/*
|
||||
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -220,6 +220,12 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.117"
|
||||
@@ -364,10 +370,12 @@ dependencies = [
|
||||
"md-5",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"rand",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"sha-1",
|
||||
"sqlparser",
|
||||
"statsd",
|
||||
@@ -477,6 +485,12 @@ version = "0.6.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -500,6 +514,17 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.79"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha-1"
|
||||
version = "0.10.0"
|
||||
|
||||
@@ -17,6 +17,7 @@ sha-1 = "0.10"
|
||||
toml = "0.5"
|
||||
serde = "1"
|
||||
serde_derive = "1"
|
||||
serde_json = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
@@ -25,3 +26,4 @@ sqlparser = "0.14"
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
env_logger = "0.9"
|
||||
parking_lot = "0.11"
|
||||
|
||||
59
README.md
59
README.md
@@ -11,14 +11,14 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
|
||||
## Features
|
||||
| **Feature** | **Status** | **Comments** |
|
||||
|--------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. |
|
||||
| Session pooling | :heavy_check_mark: | Identical to PgBouncer. |
|
||||
| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
|
||||
| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. |
|
||||
| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
|
||||
| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
|
||||
| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
|
||||
| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
|
||||
| Transaction pooling | :white_check_mark: | Identical to PgBouncer. |
|
||||
| Session pooling | :white_check_mark: | Identical to PgBouncer. |
|
||||
| `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
|
||||
| Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. |
|
||||
| Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
|
||||
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
|
||||
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
|
||||
| Statistics reporting | :white_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
|
||||
| Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. |
|
||||
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
|
||||
|
||||
@@ -75,15 +75,15 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||
|
||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. |
|
||||
| Session pooling | :heavy_check_mark: | :heavy_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. |
|
||||
| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
|
||||
| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
|
||||
| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. |
|
||||
| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. |
|
||||
| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
|
||||
| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
|
||||
| Live config reloading | :heavy_check_mark: | :heavy_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
|
||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||
| Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. |
|
||||
| `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
|
||||
| Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
|
||||
| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
|
||||
| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
|
||||
| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
|
||||
| Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
|
||||
| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -133,6 +133,31 @@ All servers are checked with a `SELECT 1` query before being given to a client.
|
||||
|
||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||
|
||||
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
|
||||
|
||||
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|
||||
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the round-robin loop. |
|
||||
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the round-robin loop. |
|
||||
| Read query | replica | false | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||
| Read query | primary | false | false | up | Query is routed to the primary. |
|
||||
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the round-robin loop is attempted. |
|
||||
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the round-robin loop. |
|
||||
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
|
||||
| | | | | | |
|
||||
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the round-robin loop. If the instance is a replica, the query fails and the client receives an error. |
|
||||
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
|
||||
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
|
||||
| Write query | primary | false | false | up | The query is routed to the primary. |
|
||||
| Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
|
||||
| Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| | | | | | |
|
||||
|
||||
### Sharding
|
||||
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
|
||||
|
||||
|
||||
22
pgcat.toml
22
pgcat.toml
@@ -48,8 +48,8 @@ password = "sharding_user"
|
||||
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ],
|
||||
["127.0.0.1", 5432, "primary"],
|
||||
["localhost", 5432, "replica"],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
# Database name (e.g. "postgres")
|
||||
@@ -58,8 +58,8 @@ database = "shard0"
|
||||
[shards.1]
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ],
|
||||
["127.0.0.1", 5432, "primary"],
|
||||
["localhost", 5432, "replica"],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
database = "shard1"
|
||||
@@ -67,8 +67,8 @@ database = "shard1"
|
||||
[shards.2]
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ],
|
||||
["127.0.0.1", 5432, "primary"],
|
||||
["localhost", 5432, "replica"],
|
||||
# [ "127.0.1.1", 5432, "replica" ],
|
||||
]
|
||||
database = "shard2"
|
||||
@@ -96,3 +96,13 @@ query_parser_enabled = false
|
||||
# load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# queries. The primary can always be explicitely selected with our custom protocol.
|
||||
primary_reads_enabled = true
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
# or you've already built one and you want this pooler to use it?
|
||||
#
|
||||
# Current options:
|
||||
#
|
||||
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
|
||||
# sha1: A hashing function based on SHA1
|
||||
#
|
||||
sharding_function = "pg_bigint_hash"
|
||||
|
||||
351
src/admin.rs
Normal file
351
src/admin.rs
Normal file
@@ -0,0 +1,351 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{info, trace};
|
||||
use tokio::net::tcp::OwnedWriteHalf;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::config::{get_config, parse};
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::ConnectionPool;
|
||||
use crate::stats::get_stats;
|
||||
|
||||
/// Handle admin client
|
||||
pub async fn handle_admin(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
mut query: BytesMut,
|
||||
pool: ConnectionPool,
|
||||
) -> Result<(), Error> {
|
||||
let code = query.get_u8() as char;
|
||||
|
||||
if code != 'Q' {
|
||||
return Err(Error::ProtocolSyncError);
|
||||
}
|
||||
|
||||
let len = query.get_i32() as usize;
|
||||
let query = String::from_utf8_lossy(&query[..len - 5])
|
||||
.to_string()
|
||||
.to_ascii_uppercase();
|
||||
|
||||
trace!("Admin query: {}", query);
|
||||
|
||||
if query.starts_with("SHOW STATS") {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream).await
|
||||
} else if query.starts_with("RELOAD") {
|
||||
trace!("RELOAD");
|
||||
reload(stream).await
|
||||
} else if query.starts_with("SHOW CONFIG") {
|
||||
trace!("SHOW CONFIG");
|
||||
show_config(stream).await
|
||||
} else if query.starts_with("SHOW DATABASES") {
|
||||
trace!("SHOW DATABASES");
|
||||
show_databases(stream, &pool).await
|
||||
} else if query.starts_with("SHOW POOLS") {
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream, &pool).await
|
||||
} else if query.starts_with("SHOW LISTS") {
|
||||
trace!("SHOW LISTS");
|
||||
show_lists(stream, &pool).await
|
||||
} else if query.starts_with("SHOW VERSION") {
|
||||
trace!("SHOW VERSION");
|
||||
show_version(stream).await
|
||||
} else if query.starts_with("SET ") {
|
||||
trace!("SET");
|
||||
ignore_set(stream).await
|
||||
} else {
|
||||
error_response(stream, "Unsupported query against the admin database").await
|
||||
}
|
||||
}
|
||||
|
||||
/// SHOW LISTS
|
||||
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let stats = get_stats();
|
||||
|
||||
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
res.put(data_row(&vec![
|
||||
"databases".to_string(),
|
||||
(pool.databases() + 1).to_string(), // see comment below
|
||||
]));
|
||||
res.put(data_row(&vec!["users".to_string(), "1".to_string()]));
|
||||
res.put(data_row(&vec![
|
||||
"pools".to_string(),
|
||||
(pool.databases() + 1).to_string(), // +1 for the pgbouncer admin db pool which isn't real
|
||||
])); // but admin tools that work with pgbouncer want this
|
||||
res.put(data_row(&vec![
|
||||
"free_clients".to_string(),
|
||||
stats["cl_idle"].to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_clients".to_string(),
|
||||
stats["cl_active"].to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"login_clients".to_string(),
|
||||
"0".to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"free_servers".to_string(),
|
||||
stats["sv_idle"].to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_servers".to_string(),
|
||||
stats["sv_active"].to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()]));
|
||||
res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()]));
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW VERSION
|
||||
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
res.put(row_description(&vec![("version", DataType::Text)]));
|
||||
res.put(data_row(&vec!["PgCat 0.1.0".to_string()]));
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW POOLS
|
||||
async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let stats = get_stats();
|
||||
let config = {
|
||||
let guard = get_config();
|
||||
&*guard.clone()
|
||||
};
|
||||
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("cl_active", DataType::Numeric),
|
||||
("cl_waiting", DataType::Numeric),
|
||||
("cl_cancel_req", DataType::Numeric),
|
||||
("sv_active", DataType::Numeric),
|
||||
("sv_idle", DataType::Numeric),
|
||||
("sv_used", DataType::Numeric),
|
||||
("sv_tested", DataType::Numeric),
|
||||
("sv_login", DataType::Numeric),
|
||||
("maxwait", DataType::Numeric),
|
||||
("maxwait_us", DataType::Numeric),
|
||||
("pool_mode", DataType::Text),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
let mut row = vec![String::from("all"), config.user.name.clone()];
|
||||
|
||||
for column in &columns[2..columns.len() - 1] {
|
||||
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||
row.push(value);
|
||||
}
|
||||
|
||||
row.push(config.general.pool_mode.to_string());
|
||||
|
||||
res.put(data_row(&row));
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW DATABASES
|
||||
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
let guard = get_config();
|
||||
let config = &*guard.clone();
|
||||
drop(guard);
|
||||
|
||||
// Columns
|
||||
let columns = vec![
|
||||
("name", DataType::Text),
|
||||
("host", DataType::Text),
|
||||
("port", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("force_user", DataType::Text),
|
||||
("pool_size", DataType::Int4),
|
||||
("min_pool_size", DataType::Int4),
|
||||
("reserve_pool", DataType::Int4),
|
||||
("pool_mode", DataType::Text),
|
||||
("max_connections", DataType::Int4),
|
||||
("current_connections", DataType::Int4),
|
||||
("paused", DataType::Int4),
|
||||
("disabled", DataType::Int4),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// RowDescription
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for shard in 0..pool.shards() {
|
||||
let database_name = &config.shards[&shard.to_string()].database;
|
||||
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
|
||||
res.put(data_row(&vec![
|
||||
address.name(), // name
|
||||
address.host.to_string(), // host
|
||||
address.port.to_string(), // port
|
||||
database_name.to_string(), // database
|
||||
config.user.name.to_string(), // force_user
|
||||
config.general.pool_size.to_string(), // pool_size
|
||||
"0".to_string(), // min_pool_size
|
||||
"0".to_string(), // reserve_pool
|
||||
config.general.pool_mode.to_string(), // pool_mode
|
||||
config.general.pool_size.to_string(), // max_connections
|
||||
pool_state.connections.to_string(), // current_connections
|
||||
"0".to_string(), // paused
|
||||
"0".to_string(), // disabled
|
||||
]));
|
||||
}
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Ignore any SET commands the client sends.
|
||||
/// This is common initialization done by ORMs.
|
||||
async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
custom_protocol_response_ok(stream, "SET").await
|
||||
}
|
||||
|
||||
/// RELOAD
|
||||
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
info!("Reloading config");
|
||||
|
||||
let config = get_config();
|
||||
let path = config.path.clone().unwrap();
|
||||
|
||||
parse(&path).await?;
|
||||
|
||||
let config = get_config();
|
||||
|
||||
config.show();
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// CommandComplete
|
||||
res.put(command_complete("RELOAD"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
let guard = get_config();
|
||||
let config = &*guard.clone();
|
||||
let config: HashMap<String, String> = config.into();
|
||||
drop(guard);
|
||||
|
||||
// Configs that cannot be changed dynamically.
|
||||
let immutables = ["host", "port", "connect_timeout"];
|
||||
|
||||
// Columns
|
||||
let columns = vec![
|
||||
("key", DataType::Text),
|
||||
("value", DataType::Text),
|
||||
("default", DataType::Text),
|
||||
("changeable", DataType::Text),
|
||||
];
|
||||
|
||||
// Response data
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
// DataRow rows
|
||||
for (key, value) in config {
|
||||
let changeable = if immutables.iter().filter(|col| *col == &key).count() == 1 {
|
||||
"no".to_string()
|
||||
} else {
|
||||
"yes".to_string()
|
||||
};
|
||||
|
||||
let row = vec![key, value, "-".to_string(), changeable];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// SHOW STATS
|
||||
async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("total_xact_count", DataType::Numeric),
|
||||
("total_query_count", DataType::Numeric),
|
||||
("total_received", DataType::Numeric),
|
||||
("total_sent", DataType::Numeric),
|
||||
("total_xact_time", DataType::Numeric),
|
||||
("total_query_time", DataType::Numeric),
|
||||
("total_wait_time", DataType::Numeric),
|
||||
("avg_xact_count", DataType::Numeric),
|
||||
("avg_query_count", DataType::Numeric),
|
||||
("avg_recv", DataType::Numeric),
|
||||
("avg_sent", DataType::Numeric),
|
||||
("avg_xact_time", DataType::Numeric),
|
||||
("avg_query_time", DataType::Numeric),
|
||||
("avg_wait_time", DataType::Numeric),
|
||||
];
|
||||
|
||||
let stats = get_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
let mut row = vec![
|
||||
String::from("all"), // TODO: per-database stats,
|
||||
];
|
||||
|
||||
for column in &columns[1..] {
|
||||
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||
}
|
||||
|
||||
res.put(data_row(&row));
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
@@ -2,7 +2,7 @@
|
||||
/// We are pretending to the server in this scenario,
|
||||
/// and this module implements that.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::error;
|
||||
use log::{debug, error, trace};
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
@@ -11,6 +11,7 @@ use tokio::net::{
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::admin::handle_admin;
|
||||
use crate::config::get_config;
|
||||
use crate::constants::*;
|
||||
use crate::errors::Error;
|
||||
@@ -54,6 +55,9 @@ pub struct Client {
|
||||
|
||||
// Statistics
|
||||
stats: Reporter,
|
||||
|
||||
// Clients want to talk to admin
|
||||
admin: bool,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
@@ -70,6 +74,8 @@ impl Client {
|
||||
let transaction_mode = config.general.pool_mode.starts_with("t");
|
||||
drop(config);
|
||||
loop {
|
||||
trace!("Waiting for StartupMessage");
|
||||
|
||||
// Could be StartupMessage or SSLRequest
|
||||
// which makes this variable length.
|
||||
let len = match stream.read_i32().await {
|
||||
@@ -91,6 +97,8 @@ impl Client {
|
||||
match code {
|
||||
// Client wants SSL. We don't support it at the moment.
|
||||
SSL_REQUEST_CODE => {
|
||||
trace!("Rejecting SSLRequest");
|
||||
|
||||
let mut no = BytesMut::with_capacity(1);
|
||||
no.put_u8(b'N');
|
||||
|
||||
@@ -99,17 +107,32 @@ impl Client {
|
||||
|
||||
// Regular startup message.
|
||||
PROTOCOL_VERSION_NUMBER => {
|
||||
// TODO: perform actual auth.
|
||||
trace!("Got StartupMessage");
|
||||
let parameters = parse_startup(bytes.clone())?;
|
||||
let mut user_name: String = String::new();
|
||||
match parameters.get(&"user") {
|
||||
Some(&user) => user_name = user,
|
||||
None => return Err(Error::ClientBadStartup),
|
||||
}
|
||||
start_auth(&mut stream, &user_name).await?;
|
||||
|
||||
// Generate random backend ID and secret key
|
||||
let process_id: i32 = rand::random();
|
||||
let secret_key: i32 = rand::random();
|
||||
|
||||
auth_ok(&mut stream).await?;
|
||||
write_all(&mut stream, server_info).await?;
|
||||
backend_key_data(&mut stream, process_id, secret_key).await?;
|
||||
ready_for_query(&mut stream).await?;
|
||||
trace!("Startup OK");
|
||||
|
||||
let database = parameters
|
||||
.get("database")
|
||||
.unwrap_or(parameters.get("user").unwrap());
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == &database)
|
||||
.count()
|
||||
== 1;
|
||||
|
||||
// Split the read and write streams
|
||||
// so we can control buffering.
|
||||
@@ -126,6 +149,7 @@ impl Client {
|
||||
client_server_map: client_server_map,
|
||||
parameters: parameters,
|
||||
stats: stats,
|
||||
admin: admin,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -147,6 +171,7 @@ impl Client {
|
||||
client_server_map: client_server_map,
|
||||
parameters: HashMap::new(),
|
||||
stats: stats,
|
||||
admin: false,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -161,8 +186,10 @@ impl Client {
|
||||
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
|
||||
// The client wants to cancel a query it has issued previously.
|
||||
if self.cancel_mode {
|
||||
trace!("Sending CancelRequest");
|
||||
|
||||
let (process_id, secret_key, address, port) = {
|
||||
let guard = self.client_server_map.lock().unwrap();
|
||||
let guard = self.client_server_map.lock();
|
||||
|
||||
match guard.get(&(self.process_id, self.secret_key)) {
|
||||
// Drop the mutex as soon as possible.
|
||||
@@ -193,6 +220,8 @@ impl Client {
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
// or issue commands for our sharding and server selection protocols.
|
||||
loop {
|
||||
trace!("Client idle, waiting for message");
|
||||
|
||||
// Client idle, waiting for messages.
|
||||
self.stats.client_idle(self.process_id);
|
||||
|
||||
@@ -203,6 +232,19 @@ impl Client {
|
||||
// SET SHARDING KEY TO 'bigint';
|
||||
let mut message = read_message(&mut self.read).await?;
|
||||
|
||||
// Avoid taking a server if the client just wants to disconnect.
|
||||
if message[0] as char == 'X' {
|
||||
trace!("Client disconnecting");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Handle admin database real quick
|
||||
if self.admin {
|
||||
trace!("Handling admin command");
|
||||
handle_admin(&mut self.write, message, pool.clone()).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle all custom protocol commands here.
|
||||
match query_router.try_execute_command(message.clone()) {
|
||||
// Normal query
|
||||
@@ -212,11 +254,17 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
Some((Command::SetShard, _)) | Some((Command::SetShardingKey, _)) => {
|
||||
Some((Command::SetShard, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
Some((Command::SetShardingKey, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, &format!("SET SHARDING KEY"))
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
Some((Command::SetServerRole, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
|
||||
continue;
|
||||
@@ -250,9 +298,14 @@ impl Client {
|
||||
// Waiting for server connection.
|
||||
self.stats.client_waiting(self.process_id);
|
||||
|
||||
debug!("Waiting for connection from pool");
|
||||
|
||||
// Grab a server from the pool: the client issued a regular query.
|
||||
let connection = match pool.get(query_router.shard(), query_router.role()).await {
|
||||
Ok(conn) => conn,
|
||||
Ok(conn) => {
|
||||
debug!("Got connection from pool");
|
||||
conn
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Could not get connection from pool: {:?}", err);
|
||||
error_response(&mut self.write, "could not get connection from the pool")
|
||||
@@ -272,11 +325,19 @@ impl Client {
|
||||
self.stats.client_active(self.process_id);
|
||||
self.stats.server_active(server.process_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
self.write.peer_addr().unwrap(),
|
||||
server.address()
|
||||
);
|
||||
|
||||
// Transaction loop. Multiple queries can be issued by the client here.
|
||||
// The connection belongs to the client until the transaction is over,
|
||||
// or until the client disconnects if we are in session mode.
|
||||
loop {
|
||||
let mut message = if message.len() == 0 {
|
||||
trace!("Waiting for message inside transaction or in session mode");
|
||||
|
||||
match read_message(&mut self.read).await {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
@@ -303,9 +364,13 @@ impl Client {
|
||||
let code = message.get_u8() as char;
|
||||
let _len = message.get_i32() as usize;
|
||||
|
||||
trace!("Message: {}", code);
|
||||
|
||||
match code {
|
||||
// ReadyForQuery
|
||||
'Q' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
// TODO: implement retries here for read-only transactions.
|
||||
server.send(original).await?;
|
||||
|
||||
@@ -387,6 +452,8 @@ impl Client {
|
||||
// Sync
|
||||
// Frontend (client) is asking for the query result now.
|
||||
'S' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.buffer.put(&original[..]);
|
||||
|
||||
// TODO: retries for read-only transactions.
|
||||
@@ -471,13 +538,14 @@ impl Client {
|
||||
}
|
||||
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
self.release();
|
||||
}
|
||||
}
|
||||
|
||||
/// Release the server from being mine. I can't cancel its queries anymore.
|
||||
pub fn release(&self) {
|
||||
let mut guard = self.client_server_map.lock().unwrap();
|
||||
let mut guard = self.client_server_map.lock();
|
||||
guard.remove(&(self.process_id, self.secret_key));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,15 @@ pub enum Role {
|
||||
Replica,
|
||||
}
|
||||
|
||||
impl ToString for Role {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
Role::Primary => "primary".to_string(),
|
||||
Role::Replica => "replica".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<Option<Role>> for Role {
|
||||
fn eq(&self, other: &Option<Role>) -> bool {
|
||||
match other {
|
||||
@@ -43,6 +52,7 @@ pub struct Address {
|
||||
pub port: String,
|
||||
pub shard: usize,
|
||||
pub role: Role,
|
||||
pub replica_number: usize,
|
||||
}
|
||||
|
||||
impl Default for Address {
|
||||
@@ -51,11 +61,22 @@ impl Default for Address {
|
||||
host: String::from("127.0.0.1"),
|
||||
port: String::from("5432"),
|
||||
shard: 0,
|
||||
replica_number: 0,
|
||||
role: Role::Replica,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Address {
|
||||
pub fn name(&self) -> String {
|
||||
match self.role {
|
||||
Role::Primary => format!("shard_{}_primary", self.shard),
|
||||
|
||||
Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
|
||||
pub struct User {
|
||||
pub name: String,
|
||||
@@ -118,6 +139,7 @@ pub struct QueryRouter {
|
||||
pub default_role: String,
|
||||
pub query_parser_enabled: bool,
|
||||
pub primary_reads_enabled: bool,
|
||||
pub sharding_function: String,
|
||||
}
|
||||
|
||||
impl Default for QueryRouter {
|
||||
@@ -126,12 +148,14 @@ impl Default for QueryRouter {
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: "pg_bigint_hash".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub path: Option<String>,
|
||||
pub general: General,
|
||||
pub user: User,
|
||||
pub shards: HashMap<String, Shard>,
|
||||
@@ -141,6 +165,7 @@ pub struct Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Config {
|
||||
Config {
|
||||
path: Some(String::from("pgcat.toml")),
|
||||
general: General::default(),
|
||||
user: User::default(),
|
||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||
@@ -149,6 +174,52 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
fn from(config: &Config) -> HashMap<String, String> {
|
||||
HashMap::from([
|
||||
("host".to_string(), config.general.host.to_string()),
|
||||
("port".to_string(), config.general.port.to_string()),
|
||||
(
|
||||
"pool_size".to_string(),
|
||||
config.general.pool_size.to_string(),
|
||||
),
|
||||
(
|
||||
"pool_mode".to_string(),
|
||||
config.general.pool_mode.to_string(),
|
||||
),
|
||||
(
|
||||
"connect_timeout".to_string(),
|
||||
config.general.connect_timeout.to_string(),
|
||||
),
|
||||
(
|
||||
"healthcheck_timeout".to_string(),
|
||||
config.general.healthcheck_timeout.to_string(),
|
||||
),
|
||||
("ban_time".to_string(), config.general.ban_time.to_string()),
|
||||
(
|
||||
"statsd_address".to_string(),
|
||||
config.general.statsd_address.to_string(),
|
||||
),
|
||||
(
|
||||
"default_role".to_string(),
|
||||
config.query_router.default_role.to_string(),
|
||||
),
|
||||
(
|
||||
"query_parser_enabled".to_string(),
|
||||
config.query_router.query_parser_enabled.to_string(),
|
||||
),
|
||||
(
|
||||
"primary_reads_enabled".to_string(),
|
||||
config.query_router.primary_reads_enabled.to_string(),
|
||||
),
|
||||
(
|
||||
"sharding_function".to_string(),
|
||||
config.query_router.sharding_function.to_string(),
|
||||
),
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn show(&self) {
|
||||
info!("Pool size: {}", self.general.pool_size);
|
||||
@@ -159,6 +230,8 @@ impl Config {
|
||||
self.general.healthcheck_timeout
|
||||
);
|
||||
info!("Connection timeout: {}ms", self.general.connect_timeout);
|
||||
info!("Sharding function: {}", self.query_router.sharding_function);
|
||||
info!("Number of shards: {}", self.shards.len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,7 +258,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
let config: Config = match toml::from_str(&contents) {
|
||||
let mut config: Config = match toml::from_str(&contents) {
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
error!("Could not parse config file: {}", err.to_string());
|
||||
@@ -193,6 +266,18 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
match config.query_router.sharding_function.as_ref() {
|
||||
"pg_bigint_hash" => (),
|
||||
"sha1" => (),
|
||||
_ => {
|
||||
error!(
|
||||
"Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}'",
|
||||
config.query_router.sharding_function
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
// Quick config sanity check.
|
||||
for shard in &config.shards {
|
||||
// We use addresses as unique identifiers,
|
||||
@@ -263,6 +348,8 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
config.path = Some(path.to_string());
|
||||
|
||||
CONFIG.store(Arc::new(config.clone()));
|
||||
|
||||
Ok(())
|
||||
@@ -280,5 +367,6 @@ mod test {
|
||||
assert_eq!(get_config().shards["1"].servers[0].0, "127.0.0.1");
|
||||
assert_eq!(get_config().shards["0"].servers[0].2, "primary");
|
||||
assert_eq!(get_config().query_router.default_role, "any");
|
||||
assert_eq!(get_config().path, Some("pgcat.toml".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,3 +20,8 @@ pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
|
||||
|
||||
// ErrorResponse: A code identifying the field type; if zero, this is the message terminator and no string follows.
|
||||
pub const MESSAGE_TERMINATOR: u8 = 0;
|
||||
|
||||
//
|
||||
// Data types
|
||||
//
|
||||
pub const _OID_INT8: i32 = 20; // bigint
|
||||
|
||||
@@ -8,5 +8,7 @@ pub enum Error {
|
||||
// ServerTimeout,
|
||||
// DirtyServer,
|
||||
BadConfig,
|
||||
BadUserList,
|
||||
AllServersDown,
|
||||
AuthenticationError
|
||||
}
|
||||
|
||||
18
src/main.rs
18
src/main.rs
@@ -36,6 +36,7 @@ extern crate tokio;
|
||||
extern crate toml;
|
||||
|
||||
use log::{error, info};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::{
|
||||
signal,
|
||||
@@ -44,10 +45,12 @@ use tokio::{
|
||||
};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod admin;
|
||||
mod client;
|
||||
mod config;
|
||||
mod userlist;
|
||||
mod constants;
|
||||
mod errors;
|
||||
mod messages;
|
||||
@@ -92,6 +95,15 @@ async fn main() {
|
||||
}
|
||||
};
|
||||
|
||||
// Prepare user list
|
||||
match userlist::parse("userlist.json").await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("Userlist parse error: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let config = get_config();
|
||||
|
||||
let addr = format!("{}:{}", config.general.host, config.general.port);
|
||||
@@ -111,9 +123,9 @@ async fn main() {
|
||||
|
||||
// Collect statistics and send them to StatsD
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
|
||||
let collector_tx = tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut stats_collector = Collector::new(rx);
|
||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||
stats_collector.collect().await;
|
||||
});
|
||||
|
||||
|
||||
266
src/messages.rs
266
src/messages.rs
@@ -1,5 +1,7 @@
|
||||
/// Helper functions to send one-off protocol messages
|
||||
/// and handle TcpStream (TCP socket).
|
||||
|
||||
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use md5::{Digest, Md5};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
@@ -7,10 +9,125 @@ use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
TcpStream,
|
||||
};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use log::{error};
|
||||
|
||||
use crate::errors::Error;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use rand::Rng;
|
||||
|
||||
use crate::userlist::get_user_list;
|
||||
|
||||
|
||||
/// Postgres data type mappings
|
||||
/// used in RowDescription ('T') message.
|
||||
pub enum DataType {
|
||||
Text,
|
||||
Int4,
|
||||
Numeric,
|
||||
}
|
||||
|
||||
impl From<&DataType> for i32 {
|
||||
fn from(data_type: &DataType) -> i32 {
|
||||
match data_type {
|
||||
DataType::Text => 25,
|
||||
DataType::Int4 => 23,
|
||||
DataType::Numeric => 1700,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
1. Generate salt (4 bytes of random data)
|
||||
md5(concat(md5(concat(password, username)), random-salt)))
|
||||
2. Send md5 auth request
|
||||
3. recieve PasswordMessage with salt.
|
||||
4. refactor md5_password function to be reusable
|
||||
5. check username hash combo against file
|
||||
6. AuthenticationOk or ErrorResponse
|
||||
**/
|
||||
pub async fn start_auth(stream: &mut TcpStream, user_name: &String) -> Result<(), Error> {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
//Generate random 4 byte salt
|
||||
let salt = rng.gen::<u32>();
|
||||
|
||||
// Send AuthenticationMD5Password request
|
||||
send_md5_request(stream, salt).await?;
|
||||
|
||||
let code = match stream.read_u8().await {
|
||||
Ok(code) => code as char,
|
||||
Err(_) => return Err(Error::AuthenticationError),
|
||||
};
|
||||
|
||||
match code {
|
||||
// Password response
|
||||
'p' => {
|
||||
fetch_password_and_authenticate(stream, &user_name, &salt).await?;
|
||||
Ok(auth_ok(stream).await?)
|
||||
}
|
||||
_ => {
|
||||
error!("Unknown code: {}", code);
|
||||
return Err(Error::AuthenticationError);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_md5_request(stream: &mut TcpStream, salt: u32) -> Result<(), Error> {
|
||||
let mut authentication_md5password = BytesMut::with_capacity(12);
|
||||
authentication_md5password.put_u8(b'R');
|
||||
authentication_md5password.put_i32(12);
|
||||
authentication_md5password.put_i32(5);
|
||||
authentication_md5password.put_u32(salt);
|
||||
|
||||
// Send AuthenticationMD5Password request
|
||||
Ok(write_all(stream, authentication_md5password).await?)
|
||||
}
|
||||
|
||||
pub async fn fetch_password_and_authenticate(stream: &mut TcpStream, user_name: &String, salt: &u32) -> Result<(), Error> {
|
||||
/**
|
||||
1. How do I store the lists of users and paswords? clear text or hash?? wtf
|
||||
2. Add auth to tests
|
||||
**/
|
||||
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::AuthenticationError),
|
||||
};
|
||||
|
||||
// Read whatever is left.
|
||||
let mut password_hash = vec![0u8; len as usize - 4];
|
||||
|
||||
match stream.read_exact(&mut password_hash).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::AuthenticationError),
|
||||
};
|
||||
|
||||
let user_list = get_user_list();
|
||||
let mut password: String = String::new();
|
||||
match user_list.get(&user_name) {
|
||||
Some(&p) => password = p,
|
||||
None => return Err(Error::AuthenticationError),
|
||||
}
|
||||
|
||||
let mut md5 = Md5::new();
|
||||
|
||||
// concat('md5', md5(concat(md5(concat(password, username)), random-salt)))
|
||||
// First pass
|
||||
md5.update(&password.as_bytes());
|
||||
md5.update(&user_name.as_bytes());
|
||||
let output = md5.finalize_reset();
|
||||
// Second pass
|
||||
md5.update(format!("{:x}", output));
|
||||
md5.update(salt.to_be_bytes().to_vec());
|
||||
|
||||
|
||||
let password_string: String = String::from_utf8(password_hash).expect("Could not get password hash");
|
||||
match format!("md5{:x}", md5.finalize()) == password_string {
|
||||
true => Ok(()),
|
||||
_ => Err(Error::AuthenticationError)
|
||||
}
|
||||
}
|
||||
|
||||
/// Tell the client that authentication handshake completed successfully.
|
||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
@@ -38,7 +155,7 @@ pub async fn backend_key_data(
|
||||
Ok(write_all(stream, key_data).await?)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// Construct a `Q`: Query message.
|
||||
pub fn simple_query(query: &str) -> BytesMut {
|
||||
let mut res = BytesMut::from(&b"Q"[..]);
|
||||
let query = format!("{}\0", query);
|
||||
@@ -91,9 +208,8 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse StartupMessage parameters.
|
||||
/// e.g. user, database, application_name, etc.
|
||||
pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
/// Parse the params the server sends as a key/value format.
|
||||
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
let mut result = HashMap::new();
|
||||
let mut buf = Vec::new();
|
||||
let mut tmp = String::new();
|
||||
@@ -115,7 +231,7 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
||||
|
||||
// Expect pairs of name and value
|
||||
// and at least one pair to be present.
|
||||
if buf.len() % 2 != 0 && buf.len() >= 2 {
|
||||
if buf.len() % 2 != 0 || buf.len() < 2 {
|
||||
return Err(Error::ClientBadStartup);
|
||||
}
|
||||
|
||||
@@ -127,6 +243,14 @@ pub fn parse_startup(mut bytes: BytesMut) -> Result<HashMap<String, String>, Err
|
||||
i += 2;
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Parse StartupMessage parameters.
|
||||
/// e.g. user, database, application_name, etc.
|
||||
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||
let result = parse_params(bytes)?;
|
||||
|
||||
// Minimum required parameters
|
||||
// I want to have the user at the very minimum, according to the protocol spec.
|
||||
if !result.contains_key("user") {
|
||||
@@ -252,68 +376,17 @@ pub async fn show_response(
|
||||
// 3. CommandComplete
|
||||
// 4. ReadyForQuery
|
||||
|
||||
// RowDescription
|
||||
let mut row_desc = BytesMut::new();
|
||||
|
||||
// Number of columns: 1
|
||||
row_desc.put_i16(1);
|
||||
|
||||
// Column name
|
||||
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// Text
|
||||
row_desc.put_i32(25);
|
||||
|
||||
// Text size = variable (-1)
|
||||
row_desc.put_i16(-1);
|
||||
|
||||
// Type modifier: none that I know
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Format being used: text (0), binary (1)
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// DataRow
|
||||
let mut data_row = BytesMut::new();
|
||||
|
||||
// Number of columns
|
||||
data_row.put_i16(1);
|
||||
|
||||
// Size of the column content (length of the string really)
|
||||
data_row.put_i32(value.len() as i32);
|
||||
|
||||
// The content
|
||||
data_row.put_slice(value.as_bytes());
|
||||
|
||||
// CommandComplete
|
||||
let mut command_complete = BytesMut::new();
|
||||
|
||||
// Number of rows returned (just one)
|
||||
command_complete.put_slice(&b"SELECT 1\0"[..]);
|
||||
|
||||
// The final messages sent to the client
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
// RowDescription
|
||||
res.put_u8(b'T');
|
||||
res.put_i32(row_desc.len() as i32 + 4);
|
||||
res.put(row_desc);
|
||||
res.put(row_description(&vec![(name, DataType::Text)]));
|
||||
|
||||
// DataRow
|
||||
res.put_u8(b'D');
|
||||
res.put_i32(data_row.len() as i32 + 4);
|
||||
res.put(data_row);
|
||||
res.put(data_row(&vec![value.to_string()]));
|
||||
|
||||
// CommandComplete
|
||||
res.put_u8(b'C');
|
||||
res.put_i32(command_complete.len() as i32 + 4);
|
||||
res.put(command_complete);
|
||||
res.put(command_complete("SELECT 1"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
@@ -323,6 +396,77 @@ pub async fn show_response(
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
||||
let mut res = BytesMut::new();
|
||||
let mut row_desc = BytesMut::new();
|
||||
|
||||
// how many colums we are storing
|
||||
row_desc.put_i16(columns.len() as i16);
|
||||
|
||||
for (name, data_type) in columns {
|
||||
// Column name
|
||||
row_desc.put_slice(&format!("{}\0", name).as_bytes());
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i32(0);
|
||||
|
||||
// Doesn't belong to any table
|
||||
row_desc.put_i16(0);
|
||||
|
||||
// Text
|
||||
row_desc.put_i32(data_type.into());
|
||||
|
||||
// Text size = variable (-1)
|
||||
let type_size = match data_type {
|
||||
DataType::Text => -1,
|
||||
DataType::Int4 => 4,
|
||||
DataType::Numeric => -1,
|
||||
};
|
||||
|
||||
row_desc.put_i16(type_size);
|
||||
|
||||
// Type modifier: none that I know
|
||||
row_desc.put_i32(-1);
|
||||
|
||||
// Format being used: text (0), binary (1)
|
||||
row_desc.put_i16(0);
|
||||
}
|
||||
|
||||
res.put_u8(b'T');
|
||||
res.put_i32(row_desc.len() as i32 + 4);
|
||||
res.put(row_desc);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn data_row(row: &Vec<String>) -> BytesMut {
|
||||
let mut res = BytesMut::new();
|
||||
let mut data_row = BytesMut::new();
|
||||
|
||||
data_row.put_i16(row.len() as i16);
|
||||
|
||||
for column in row {
|
||||
let column = column.as_bytes();
|
||||
data_row.put_i32(column.len() as i32);
|
||||
data_row.put_slice(&column);
|
||||
}
|
||||
|
||||
res.put_u8(b'D');
|
||||
res.put_i32(data_row.len() as i32 + 4);
|
||||
res.put(data_row);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn command_complete(command: &str) -> BytesMut {
|
||||
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
||||
let mut res = BytesMut::new();
|
||||
res.put_u8(b'C');
|
||||
res.put_i32(cmd.len() as i32 + 4);
|
||||
res.put(cmd);
|
||||
res
|
||||
}
|
||||
|
||||
/// Write all data in the buffer to the TcpStream.
|
||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||
match stream.write_all(&buf).await {
|
||||
|
||||
77
src/pool.rs
77
src/pool.rs
@@ -3,7 +3,8 @@ use async_trait::async_trait;
|
||||
use bb8::{ManageConnection, Pool, PooledConnection};
|
||||
use bytes::BytesMut;
|
||||
use chrono::naive::NaiveDateTime;
|
||||
use log::{error, info, warn};
|
||||
use log::{debug, error, info, warn};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
|
||||
use crate::config::{get_config, Address, Role, User};
|
||||
use crate::errors::Error;
|
||||
@@ -11,11 +12,11 @@ use crate::server::Server;
|
||||
use crate::stats::Reporter;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
// Banlist: bad servers go in here.
|
||||
pub type BanList = Arc<Mutex<Vec<HashMap<Address, NaiveDateTime>>>>;
|
||||
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
|
||||
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -48,7 +49,8 @@ impl ConnectionPool {
|
||||
for shard_idx in shard_ids {
|
||||
let shard = &config.shards[&shard_idx];
|
||||
let mut pools = Vec::new();
|
||||
let mut replica_addresses = Vec::new();
|
||||
let mut servers = Vec::new();
|
||||
let mut replica_number = 0;
|
||||
|
||||
for server in shard.servers.iter() {
|
||||
let role = match server.2.as_ref() {
|
||||
@@ -64,9 +66,14 @@ impl ConnectionPool {
|
||||
host: server.0.clone(),
|
||||
port: server.1.to_string(),
|
||||
role: role,
|
||||
replica_number,
|
||||
shard: shard_idx.parse::<usize>().unwrap(),
|
||||
};
|
||||
|
||||
if role == Role::Replica {
|
||||
replica_number += 1;
|
||||
}
|
||||
|
||||
let manager = ServerPool::new(
|
||||
address.clone(),
|
||||
config.user.clone(),
|
||||
@@ -86,11 +93,11 @@ impl ConnectionPool {
|
||||
.unwrap();
|
||||
|
||||
pools.push(pool);
|
||||
replica_addresses.push(address);
|
||||
servers.push(address);
|
||||
}
|
||||
|
||||
shards.push(pools);
|
||||
addresses.push(replica_addresses);
|
||||
addresses.push(servers);
|
||||
banlist.push(HashMap::new());
|
||||
}
|
||||
|
||||
@@ -101,7 +108,7 @@ impl ConnectionPool {
|
||||
databases: shards,
|
||||
addresses: addresses,
|
||||
round_robin: rand::random::<usize>() % address_len, // Start at a random replica
|
||||
banlist: Arc::new(Mutex::new(banlist)),
|
||||
banlist: Arc::new(RwLock::new(banlist)),
|
||||
stats: stats,
|
||||
}
|
||||
}
|
||||
@@ -125,10 +132,22 @@ impl ConnectionPool {
|
||||
};
|
||||
|
||||
let mut proxy = connection.0;
|
||||
let _address = connection.1;
|
||||
let address = connection.1;
|
||||
let server = &mut *proxy;
|
||||
|
||||
server_infos.push(server.server_info());
|
||||
let server_info = server.server_info();
|
||||
|
||||
if server_infos.len() > 0 {
|
||||
// Compare against the last server checked.
|
||||
if server_info != server_infos[server_infos.len() - 1] {
|
||||
warn!(
|
||||
"{:?} has different server configuration than the last server",
|
||||
address
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
server_infos.push(server_info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +180,8 @@ impl ConnectionPool {
|
||||
_ => addresses.len(),
|
||||
};
|
||||
|
||||
debug!("Allowed attempts for {:?}: {}", role, allowed_attempts);
|
||||
|
||||
let exists = match role {
|
||||
Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0,
|
||||
None => true,
|
||||
@@ -251,14 +272,14 @@ impl ConnectionPool {
|
||||
pub fn ban(&self, address: &Address, shard: usize) {
|
||||
error!("Banning {:?}", address);
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
let mut guard = self.banlist.write();
|
||||
guard[shard].insert(address.clone(), now);
|
||||
}
|
||||
|
||||
/// Clear the replica to receive traffic again. Takes effect immediately
|
||||
/// for all new transactions.
|
||||
pub fn _unban(&self, address: &Address, shard: usize) {
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
let mut guard = self.banlist.write();
|
||||
guard[shard].remove(address);
|
||||
}
|
||||
|
||||
@@ -274,12 +295,14 @@ impl ConnectionPool {
|
||||
Some(Role::Primary) => return false, // Primary cannot be banned.
|
||||
};
|
||||
|
||||
// If you're not asking for the primary,
|
||||
// all databases are treated as replicas.
|
||||
let mut guard = self.banlist.lock().unwrap();
|
||||
debug!("Available targets for {:?}: {}", role, replicas_available);
|
||||
|
||||
let guard = self.banlist.read();
|
||||
|
||||
// Everything is banned = nothing is banned.
|
||||
if guard[shard].len() == replicas_available {
|
||||
drop(guard);
|
||||
let mut guard = self.banlist.write();
|
||||
guard[shard].clear();
|
||||
drop(guard);
|
||||
warn!("Unbanning all replicas.");
|
||||
@@ -291,16 +314,24 @@ impl ConnectionPool {
|
||||
Some(timestamp) => {
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let config = get_config();
|
||||
|
||||
// Ban expired.
|
||||
if now.timestamp() - timestamp.timestamp() > config.general.ban_time {
|
||||
drop(guard);
|
||||
warn!("Unbanning {:?}", address);
|
||||
let mut guard = self.banlist.write();
|
||||
guard[shard].remove(address);
|
||||
false
|
||||
} else {
|
||||
debug!("{:?} is banned", address);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
None => false,
|
||||
None => {
|
||||
debug!("{:?} is ok", address);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,6 +342,22 @@ impl ConnectionPool {
|
||||
pub fn servers(&self, shard: usize) -> usize {
|
||||
self.addresses[shard].len()
|
||||
}
|
||||
|
||||
pub fn databases(&self) -> usize {
|
||||
let mut databases = 0;
|
||||
for shard in 0..self.shards() {
|
||||
databases += self.servers(shard);
|
||||
}
|
||||
databases
|
||||
}
|
||||
|
||||
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
|
||||
self.databases[shard][server].state()
|
||||
}
|
||||
|
||||
pub fn address(&self, shard: usize, server: usize) -> &Address {
|
||||
&self.addresses[shard][server]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServerPool {
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
use crate::config::{get_config, Role};
|
||||
use crate::sharding::Sharder;
|
||||
use crate::sharding::{Sharder, ShardingFunction};
|
||||
/// Route queries automatically based on explicitely requested
|
||||
/// or implied query characteristics.
|
||||
use bytes::{Buf, BytesMut};
|
||||
use log::{debug, error};
|
||||
use once_cell::sync::OnceCell;
|
||||
use regex::RegexSet;
|
||||
use regex::{Regex, RegexSet};
|
||||
use sqlparser::ast::Statement::{Query, StartTransaction};
|
||||
use sqlparser::dialect::PostgreSqlDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
|
||||
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
||||
r"(?i)SET SHARDING KEY TO '[0-9]+'",
|
||||
r"(?i)SET SHARD TO '[0-9]+'",
|
||||
r"(?i)SHOW SHARD",
|
||||
r"(?i)SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)'",
|
||||
r"(?i)SHOW SERVER ROLE",
|
||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||
r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$",
|
||||
r"(?i)^ *SHOW SHARD *;? *$",
|
||||
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||
];
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
@@ -27,8 +27,12 @@ pub enum Command {
|
||||
ShowServerRole,
|
||||
}
|
||||
|
||||
// Quick test
|
||||
static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
|
||||
|
||||
// Capture value
|
||||
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
|
||||
|
||||
pub struct QueryRouter {
|
||||
// By default, queries go here, unless we have better information
|
||||
// about what the client wants.
|
||||
@@ -48,6 +52,9 @@ pub struct QueryRouter {
|
||||
|
||||
// Should we try to parse queries?
|
||||
query_parser_enabled: bool,
|
||||
|
||||
// Which sharding function are we using?
|
||||
sharding_function: ShardingFunction,
|
||||
}
|
||||
|
||||
impl QueryRouter {
|
||||
@@ -60,6 +67,21 @@ impl QueryRouter {
|
||||
}
|
||||
};
|
||||
|
||||
let list: Vec<_> = CUSTOM_SQL_REGEXES
|
||||
.iter()
|
||||
.map(|rgx| Regex::new(rgx).unwrap())
|
||||
.collect();
|
||||
|
||||
// Impossible
|
||||
if list.len() != set.len() {
|
||||
return false;
|
||||
}
|
||||
|
||||
match CUSTOM_SQL_REGEX_LIST.set(list) {
|
||||
Ok(_) => true,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
match CUSTOM_SQL_REGEX_SET.set(set) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
@@ -76,6 +98,12 @@ impl QueryRouter {
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let sharding_function = match config.query_router.sharding_function.as_ref() {
|
||||
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
|
||||
"sha1" => ShardingFunction::Sha1,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
QueryRouter {
|
||||
default_server_role: default_server_role,
|
||||
shards: config.shards.len(),
|
||||
@@ -84,6 +112,7 @@ impl QueryRouter {
|
||||
active_shard: None,
|
||||
primary_reads_enabled: config.query_router.primary_reads_enabled,
|
||||
query_parser_enabled: config.query_router.query_parser_enabled,
|
||||
sharding_function,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +132,11 @@ impl QueryRouter {
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let regex_list = match CUSTOM_SQL_REGEX_LIST.get() {
|
||||
Some(regex_list) => regex_list,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let matches: Vec<_> = regex_set.matches(&query).into_iter().collect();
|
||||
|
||||
if matches.len() != 1 {
|
||||
@@ -120,7 +154,19 @@ impl QueryRouter {
|
||||
|
||||
let mut value = match command {
|
||||
Command::SetShardingKey | Command::SetShard | Command::SetServerRole => {
|
||||
query.split("'").collect::<Vec<&str>>()[1].to_string()
|
||||
// Capture value. I know this re-runs the regex engine, but I haven't
|
||||
// figured out a better way just yet. I think I can write a single Regex
|
||||
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
||||
//
|
||||
// I think this is faster than running the Regex engine 5 times, so
|
||||
// this is a strong maybe for me so far.
|
||||
match regex_list[matches[0]].captures(&query) {
|
||||
Some(captures) => match captures.get(1) {
|
||||
Some(value) => value.as_str().to_string(),
|
||||
None => return None,
|
||||
},
|
||||
None => return None,
|
||||
}
|
||||
}
|
||||
|
||||
Command::ShowShard => self.shard().to_string(),
|
||||
@@ -139,8 +185,8 @@ impl QueryRouter {
|
||||
|
||||
match command {
|
||||
Command::SetShardingKey => {
|
||||
let sharder = Sharder::new(self.shards);
|
||||
let shard = sharder.pg_bigint_hash(value.parse::<i64>().unwrap());
|
||||
let sharder = Sharder::new(self.shards, self.sharding_function);
|
||||
let shard = sharder.shard(value.parse::<i64>().unwrap());
|
||||
self.active_shard = Some(shard);
|
||||
value = shard.to_string();
|
||||
}
|
||||
@@ -194,7 +240,12 @@ impl QueryRouter {
|
||||
let len = buf.get_i32() as usize;
|
||||
|
||||
let query = match code {
|
||||
'Q' => String::from_utf8_lossy(&buf[..len - 5]).to_string(),
|
||||
'Q' => {
|
||||
let query = String::from_utf8_lossy(&buf[..len - 5]).to_string();
|
||||
debug!("Query: '{}'", query);
|
||||
query
|
||||
}
|
||||
|
||||
'P' => {
|
||||
let mut start = 0;
|
||||
let mut end;
|
||||
@@ -213,6 +264,8 @@ impl QueryRouter {
|
||||
|
||||
let query = String::from_utf8_lossy(&buf[start..end]).to_string();
|
||||
|
||||
debug!("Prepared statement: '{}'", query);
|
||||
|
||||
query.replace("$", "") // Remove placeholders turning them into "values"
|
||||
}
|
||||
_ => return false,
|
||||
@@ -221,7 +274,7 @@ impl QueryRouter {
|
||||
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
||||
Ok(ast) => ast,
|
||||
Err(err) => {
|
||||
debug!("{:?}, query: {}", err, query);
|
||||
debug!("{}", err.to_string());
|
||||
return false;
|
||||
}
|
||||
};
|
||||
@@ -394,14 +447,38 @@ mod test {
|
||||
"set server role to 'any'",
|
||||
"set server role to 'auto'",
|
||||
"show server role",
|
||||
// No quotes
|
||||
"SET SHARDING KEY TO 11235",
|
||||
"SET SHARD TO 15",
|
||||
// Spaces and semicolon
|
||||
" SET SHARDING KEY TO 11235 ; ",
|
||||
" SET SHARD TO 15; ",
|
||||
" SET SHARDING KEY TO 11235 ;",
|
||||
" SET SERVER ROLE TO 'primary'; ",
|
||||
" SET SERVER ROLE TO 'primary' ; ",
|
||||
" SET SERVER ROLE TO 'primary' ;",
|
||||
];
|
||||
|
||||
// Which regexes it'll match to in the list
|
||||
let matches = [
|
||||
0, 1, 2, 3, 3, 3, 3, 4, 0, 1, 2, 3, 3, 3, 3, 4, 0, 1, 0, 1, 0, 3, 3, 3,
|
||||
];
|
||||
|
||||
let list = CUSTOM_SQL_REGEX_LIST.get().unwrap();
|
||||
let set = CUSTOM_SQL_REGEX_SET.get().unwrap();
|
||||
|
||||
for test in &tests {
|
||||
let matches: Vec<_> = set.matches(test).into_iter().collect();
|
||||
for (i, test) in tests.iter().enumerate() {
|
||||
assert!(list[matches[i]].is_match(test));
|
||||
assert_eq!(set.matches(test).into_iter().collect::<Vec<_>>().len(), 1);
|
||||
}
|
||||
|
||||
assert_eq!(matches.len(), 1);
|
||||
let bad = [
|
||||
"SELECT * FROM table",
|
||||
"SELECT * FROM table WHERE value = 'set sharding key to 5'", // Don't capture things in the middle of the query
|
||||
];
|
||||
|
||||
for query in &bad {
|
||||
assert_eq!(set.matches(query).into_iter().collect::<Vec<_>>().len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -411,7 +488,7 @@ mod test {
|
||||
let mut qr = QueryRouter::new();
|
||||
|
||||
// SetShardingKey
|
||||
let query = simple_query("SET SHARDING KEY TO '13'");
|
||||
let query = simple_query("SET SHARDING KEY TO 13");
|
||||
assert_eq!(
|
||||
qr.try_execute_command(query),
|
||||
Some((Command::SetShardingKey, String::from("1")))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
///! Implementation of the PostgreSQL server (database) protocol.
|
||||
///! Here we are pretending to the a Postgres client.
|
||||
use log::{error, info};
|
||||
use log::{debug, error, info, trace};
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
@@ -75,6 +75,8 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Sending StartupMessage");
|
||||
|
||||
// Send the startup packet telling the server we're a normal Postgres client.
|
||||
startup(&mut stream, &user.name, database).await?;
|
||||
|
||||
@@ -95,6 +97,8 @@ impl Server {
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
trace!("Message: {}", code);
|
||||
|
||||
match code {
|
||||
// Authentication
|
||||
'R' => {
|
||||
@@ -104,6 +108,8 @@ impl Server {
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
trace!("Auth: {}", auth_code);
|
||||
|
||||
match auth_code {
|
||||
MD5_ENCRYPTED_PASSWORD => {
|
||||
// The salt is 4 bytes.
|
||||
@@ -135,6 +141,8 @@ impl Server {
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
trace!("Error: {}", error_code);
|
||||
|
||||
match error_code {
|
||||
// No error message is present in the message.
|
||||
MESSAGE_TERMINATOR => (),
|
||||
@@ -247,6 +255,8 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Sending CancelRequest");
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(16);
|
||||
bytes.put_i32(16);
|
||||
bytes.put_i32(CANCEL_REQUEST_CODE);
|
||||
@@ -290,6 +300,8 @@ impl Server {
|
||||
let code = message.get_u8() as char;
|
||||
let _len = message.get_i32();
|
||||
|
||||
trace!("Message: {}", code);
|
||||
|
||||
match code {
|
||||
// ReadyForQuery
|
||||
'Z' => {
|
||||
@@ -403,7 +415,7 @@ impl Server {
|
||||
|
||||
/// Claim this server as mine for the purposes of query cancellation.
|
||||
pub fn claim(&mut self, process_id: i32, secret_key: i32) {
|
||||
let mut guard = self.client_server_map.lock().unwrap();
|
||||
let mut guard = self.client_server_map.lock();
|
||||
guard.insert(
|
||||
(process_id, secret_key),
|
||||
(
|
||||
@@ -419,18 +431,9 @@ impl Server {
|
||||
/// It will use the simple query protocol.
|
||||
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
||||
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
||||
let mut query = BytesMut::from(&query.as_bytes()[..]);
|
||||
query.put_u8(0); // C-string terminator (NULL character).
|
||||
let query = simple_query(query);
|
||||
|
||||
let len = query.len() as i32 + 4;
|
||||
|
||||
let mut msg = BytesMut::with_capacity(len as usize + 1);
|
||||
|
||||
msg.put_u8(b'Q');
|
||||
msg.put_i32(len);
|
||||
msg.put_slice(&query[..]);
|
||||
|
||||
self.send(msg).await?;
|
||||
self.send(query).await?;
|
||||
|
||||
loop {
|
||||
let _ = self.recv().await?;
|
||||
|
||||
@@ -1,26 +1,62 @@
|
||||
use sha1::{Digest, Sha1};
|
||||
|
||||
// https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20
|
||||
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum ShardingFunction {
|
||||
PgBigintHash,
|
||||
Sha1,
|
||||
}
|
||||
|
||||
pub struct Sharder {
|
||||
shards: usize,
|
||||
sharding_function: ShardingFunction,
|
||||
}
|
||||
|
||||
impl Sharder {
|
||||
pub fn new(shards: usize) -> Sharder {
|
||||
Sharder { shards: shards }
|
||||
pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
|
||||
Sharder {
|
||||
shards,
|
||||
sharding_function,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shard(&self, key: i64) -> usize {
|
||||
match self.sharding_function {
|
||||
ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
|
||||
ShardingFunction::Sha1 => self.sha1(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Hash function used by Postgres to determine which partition
|
||||
/// to put the row in when using HASH(column) partitioning.
|
||||
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631
|
||||
/// Supports only 1 bigint at the moment, but we can add more later.
|
||||
pub fn pg_bigint_hash(&self, key: i64) -> usize {
|
||||
fn pg_bigint_hash(&self, key: i64) -> usize {
|
||||
let mut lohalf = key as u32;
|
||||
let hihalf = (key >> 32) as u32;
|
||||
lohalf ^= if key >= 0 { hihalf } else { !hihalf };
|
||||
Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards
|
||||
}
|
||||
|
||||
/// Example of a hashing function based on SHA1.
|
||||
fn sha1(&self, key: i64) -> usize {
|
||||
let mut hasher = Sha1::new();
|
||||
|
||||
hasher.update(&key.to_string().as_bytes());
|
||||
|
||||
let result = hasher.finalize();
|
||||
|
||||
// Convert the SHA1 hash into hex so we can parse it as a large integer.
|
||||
let hex = format!("{:x}", result);
|
||||
|
||||
// Parse the last 8 bytes as an integer (8 bytes = bigint).
|
||||
let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize;
|
||||
|
||||
key % self.shards
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn rot(x: u32, k: u32) -> u32 {
|
||||
(x << k) | (x >> (32 - k))
|
||||
@@ -109,36 +145,51 @@ mod test {
|
||||
// confirming that we implemented Postgres BIGINT hashing correctly.
|
||||
#[test]
|
||||
fn test_pg_bigint_hash() {
|
||||
let sharder = Sharder::new(5);
|
||||
let sharder = Sharder::new(5, ShardingFunction::PgBigintHash);
|
||||
|
||||
let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53];
|
||||
|
||||
for v in shard_0 {
|
||||
assert_eq!(sharder.pg_bigint_hash(v), 0);
|
||||
assert_eq!(sharder.shard(v), 0);
|
||||
}
|
||||
|
||||
let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54];
|
||||
|
||||
for v in shard_1 {
|
||||
assert_eq!(sharder.pg_bigint_hash(v), 1);
|
||||
assert_eq!(sharder.shard(v), 1);
|
||||
}
|
||||
|
||||
let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35];
|
||||
|
||||
for v in shard_2 {
|
||||
assert_eq!(sharder.pg_bigint_hash(v), 2);
|
||||
assert_eq!(sharder.shard(v), 2);
|
||||
}
|
||||
|
||||
let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43];
|
||||
|
||||
for v in shard_3 {
|
||||
assert_eq!(sharder.pg_bigint_hash(v), 3);
|
||||
assert_eq!(sharder.shard(v), 3);
|
||||
}
|
||||
|
||||
let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45];
|
||||
|
||||
for v in shard_4 {
|
||||
assert_eq!(sharder.pg_bigint_hash(v), 4);
|
||||
assert_eq!(sharder.shard(v), 4);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sha1_hash() {
|
||||
let sharder = Sharder::new(12, ShardingFunction::Sha1);
|
||||
let ids = vec![
|
||||
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
|
||||
];
|
||||
let shards = vec![
|
||||
4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
|
||||
];
|
||||
|
||||
for (i, id) in ids.iter().enumerate() {
|
||||
assert_eq!(sharder.shard(*id), shards[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
262
src/stats.rs
262
src/stats.rs
@@ -1,13 +1,17 @@
|
||||
use log::info;
|
||||
use log::{debug, info};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use statsd::Client;
|
||||
/// Events collector and publisher.
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::config::get_config;
|
||||
|
||||
// Stats used in SHOW STATS
|
||||
static LATEST_STATS: Lazy<Mutex<HashMap<String, i64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
|
||||
static STAT_PERIOD: u64 = 15000; //15 seconds
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum EventName {
|
||||
CheckoutTime,
|
||||
@@ -24,6 +28,7 @@ enum EventName {
|
||||
ServerTested,
|
||||
ServerLogin,
|
||||
ServerDisconnecting,
|
||||
FlushStatsToStatsD,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -44,155 +49,157 @@ impl Reporter {
|
||||
}
|
||||
|
||||
pub fn query(&self) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::Query,
|
||||
value: 1,
|
||||
process_id: None,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn transaction(&self) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::Transaction,
|
||||
value: 1,
|
||||
process_id: None,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn data_sent(&self, amount: usize) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::DataSent,
|
||||
value: amount as i64,
|
||||
process_id: None,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn data_received(&self, amount: usize) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::DataReceived,
|
||||
value: amount as i64,
|
||||
process_id: None,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn checkout_time(&self, ms: u128) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::CheckoutTime,
|
||||
value: ms as i64,
|
||||
process_id: None,
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_waiting(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ClientWaiting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_active(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ClientActive,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_idle(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ClientIdle,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn client_disconnecting(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ClientDisconnecting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_active(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ServerActive,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_idle(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ServerIdle,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_login(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ServerLogin,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_tested(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ServerTested,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
pub fn server_disconnecting(&self, process_id: i32) {
|
||||
let statistic = Event {
|
||||
let event = Event {
|
||||
name: EventName::ServerDisconnecting,
|
||||
value: 1,
|
||||
process_id: Some(process_id),
|
||||
};
|
||||
|
||||
let _ = self.tx.try_send(statistic);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Collector {
|
||||
rx: Receiver<Event>,
|
||||
tx: Sender<Event>,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Collector {
|
||||
pub fn new(rx: Receiver<Event>) -> Collector {
|
||||
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
||||
Collector {
|
||||
rx: rx,
|
||||
rx,
|
||||
tx,
|
||||
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
||||
}
|
||||
}
|
||||
@@ -205,7 +212,15 @@ impl Collector {
|
||||
("total_xact_count", 0),
|
||||
("total_sent", 0),
|
||||
("total_received", 0),
|
||||
("total_xact_time", 0),
|
||||
("total_query_time", 0),
|
||||
("total_wait_time", 0),
|
||||
("avg_xact_time", 0),
|
||||
("avg_query_time", 0),
|
||||
("avg_xact_count", 0),
|
||||
("avg_sent", 0),
|
||||
("avg_received", 0),
|
||||
("avg_wait_time", 0),
|
||||
("maxwait_us", 0),
|
||||
("maxwait", 0),
|
||||
("cl_waiting", 0),
|
||||
@@ -217,10 +232,29 @@ impl Collector {
|
||||
("sv_tested", 0),
|
||||
]);
|
||||
|
||||
// Stats saved after each iteration of the flush event. Used in calculation
|
||||
// of averages in the last flush period.
|
||||
let mut old_stats: HashMap<String, i64> = HashMap::new();
|
||||
|
||||
// Track which state the client and server are at any given time.
|
||||
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
|
||||
|
||||
let mut now = Instant::now();
|
||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||
let tx = self.tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut interval =
|
||||
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::FlushStatsToStatsD,
|
||||
value: 0,
|
||||
process_id: None,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// The collector loop
|
||||
loop {
|
||||
let stat = match self.rx.recv().await {
|
||||
Some(stat) => stat,
|
||||
@@ -257,10 +291,11 @@ impl Collector {
|
||||
*counter += stat.value;
|
||||
|
||||
let counter = stats.entry("maxwait_us").or_insert(0);
|
||||
let mic_part = stat.value % 1_000_000;
|
||||
|
||||
// Report max time here
|
||||
if stat.value > *counter {
|
||||
*counter = stat.value;
|
||||
if mic_part > *counter {
|
||||
*counter = mic_part;
|
||||
}
|
||||
|
||||
let counter = stats.entry("maxwait").or_insert(0);
|
||||
@@ -284,65 +319,104 @@ impl Collector {
|
||||
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
||||
client_server_states.remove(&stat.process_id.unwrap());
|
||||
}
|
||||
|
||||
EventName::FlushStatsToStatsD => {
|
||||
// Calculate connection states
|
||||
for (_, state) in &client_server_states {
|
||||
match state {
|
||||
EventName::ClientActive => {
|
||||
let counter = stats.entry("cl_active").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientWaiting => {
|
||||
let counter = stats.entry("cl_waiting").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientIdle => {
|
||||
let counter = stats.entry("cl_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerIdle => {
|
||||
let counter = stats.entry("sv_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerActive => {
|
||||
let counter = stats.entry("sv_active").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerTested => {
|
||||
let counter = stats.entry("sv_tested").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerLogin => {
|
||||
let counter = stats.entry("sv_login").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
_ => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
// Calculate averages
|
||||
for stat in &[
|
||||
"avg_query_count",
|
||||
"avgxact_count",
|
||||
"avg_sent",
|
||||
"avg_received",
|
||||
"avg_wait_time",
|
||||
] {
|
||||
let total_name = stat.replace("avg_", "total_");
|
||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||
|
||||
stats.insert(stat, avg);
|
||||
*old_value = new_value;
|
||||
}
|
||||
|
||||
debug!("{:?}", stats);
|
||||
|
||||
// Update latest stats used in SHOW STATS
|
||||
let mut guard = LATEST_STATS.lock();
|
||||
for (key, value) in &stats {
|
||||
guard.insert(key.to_string(), value.clone());
|
||||
}
|
||||
|
||||
let mut pipeline = self.client.pipeline();
|
||||
|
||||
for (key, value) in stats.iter() {
|
||||
pipeline.gauge(key, *value as f64);
|
||||
}
|
||||
|
||||
// These are re-calculated every iteration of the loop, so we don't want to add values
|
||||
// from the last iteration.
|
||||
for stat in &[
|
||||
"cl_active",
|
||||
"cl_waiting",
|
||||
"cl_idle",
|
||||
"sv_idle",
|
||||
"sv_active",
|
||||
"sv_tested",
|
||||
"sv_login",
|
||||
"maxwait",
|
||||
"maxwait_us",
|
||||
] {
|
||||
stats.insert(stat, 0);
|
||||
}
|
||||
|
||||
pipeline.send(&self.client);
|
||||
}
|
||||
};
|
||||
|
||||
// It's been 15 seconds. If there is no traffic, it won't publish anything,
|
||||
// but it also doesn't matter then.
|
||||
if now.elapsed().as_secs() > 15 {
|
||||
for (_, state) in &client_server_states {
|
||||
match state {
|
||||
EventName::ClientActive => {
|
||||
let counter = stats.entry("cl_active").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientWaiting => {
|
||||
let counter = stats.entry("cl_waiting").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ClientIdle => {
|
||||
let counter = stats.entry("cl_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerIdle => {
|
||||
let counter = stats.entry("sv_idle").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerActive => {
|
||||
let counter = stats.entry("sv_active").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerTested => {
|
||||
let counter = stats.entry("sv_tested").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
EventName::ServerLogin => {
|
||||
let counter = stats.entry("sv_login").or_insert(0);
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
_ => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
info!("{:?}", stats);
|
||||
|
||||
let mut pipeline = self.client.pipeline();
|
||||
|
||||
for (key, value) in stats.iter_mut() {
|
||||
pipeline.gauge(key, *value as f64);
|
||||
*value = 0;
|
||||
}
|
||||
|
||||
pipeline.send(&self.client);
|
||||
|
||||
now = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stats() -> HashMap<String, i64> {
|
||||
LATEST_STATS.lock().clone()
|
||||
}
|
||||
|
||||
4
src/userlist.json
Normal file
4
src/userlist.json
Normal file
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"sven": "clear_text_password",
|
||||
"sharding_user": "sharding_user"
|
||||
}
|
||||
57
src/userlist.rs
Normal file
57
src/userlist.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use arc_swap::{ArcSwap, Guard};
|
||||
use log::{error};
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use std::collections::{HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::errors::Error;
|
||||
|
||||
pub type UserList = HashMap<String, String>;
|
||||
static USER_LIST: Lazy<ArcSwap<UserList>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::new()));
|
||||
|
||||
pub fn get_user_list() -> Guard<Arc<UserList>> {
|
||||
USER_LIST.load()
|
||||
}
|
||||
|
||||
/// Parse the user list.
|
||||
pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
let mut contents = String::new();
|
||||
let mut file = match File::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(err) => {
|
||||
error!("Could not open '{}': {}", path, err.to_string());
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
match file.read_to_string(&mut contents).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("Could not read config file: {}", err.to_string());
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
let map: HashMap<String, String> = serde_json::from_str(&contents).expect("JSON was not well-formatted");
|
||||
|
||||
|
||||
|
||||
USER_LIST.store(Arc::new(map.clone()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_config() {
|
||||
parse("userlist.json").await.unwrap();
|
||||
assert_eq!(get_user_list()["sven"], "clear_text_password");
|
||||
assert_eq!(get_user_list()["sharding_user"], "sharding_user");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user