Compare commits

..

14 Commits

Author SHA1 Message Date
Lev Kokotov
d412238f47 Implement SCRAM-SHA-256 for server authentication (PG14) (#76)
* Implement SCRAM-SHA-256

* test it

* trace

* move to community for auth

* hmm
2022-06-18 18:36:00 -07:00
dependabot[bot]
7782933f59 Bump regex from 1.5.4 to 1.5.5 (#75)
Bumps [regex](https://github.com/rust-lang/regex) from 1.5.4 to 1.5.5.
- [Release notes](https://github.com/rust-lang/regex/releases)
- [Changelog](https://github.com/rust-lang/regex/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/regex/compare/1.5.4...1.5.5)

---
updated-dependencies:
- dependency-name: regex
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-06-06 19:59:50 -07:00
Lev Kokotov
bac4e1f52c Only set application_name if it's different (#74)
* Only set application_name if it's different

* keep server named pgcat until something else changes
2022-06-05 09:48:06 -07:00
Lev Kokotov
37e3a86881 Pass application_name to server (#73)
* Pass application_name to server

* fmt
2022-06-03 00:15:50 -07:00
Lev Kokotov
61db13f614 Fix memory leak in client/server mapping (#71) 2022-05-18 16:24:03 -07:00
Lev Kokotov
fe32b5ef17 Reduce traffic on the stats channel (#69) 2022-05-17 13:05:25 -07:00
Lev Kokotov
54699222f8 Possible fix for clients waiting stat leak (#68) 2022-05-14 21:35:33 -07:00
Lev Kokotov
ccbca66e7a Poorly behaved client fix (#65)
* Poorly behaved client fix

* yes officer

* fix tests

* no useless rescue

* Looks ok
2022-05-09 09:09:22 -07:00
Lev Kokotov
df85139281 Update README. Comments. Version bump. (#60)
* update readme

* comments

* just a version bump
2022-03-10 01:33:29 -08:00
Lev Kokotov
509e4815a3 Update README.md 2022-03-08 17:48:26 -08:00
Lev Kokotov
5338ff2323 Update README.md 2022-03-08 17:46:46 -08:00
Lev Kokotov
1ea0a7f332 Update README.md 2022-03-08 17:45:54 -08:00
Lev Kokotov
d1b86d363d Update README.md 2022-03-08 17:38:51 -08:00
Lev Kokotov
b309ead58f Handle SIGTERM. Add docker-compose.yml (#59)
* docker-compsoe

* remove statsd config

* readme
2022-03-08 17:18:48 -08:00
26 changed files with 908 additions and 203 deletions

View File

@@ -12,13 +12,15 @@ jobs:
- image: cimg/rust:1.58.1
environment:
RUST_LOG: info
- image: cimg/postgres:14.0
auth:
username: mydockerhub-user
password: $DOCKERHUB_PASSWORD
- image: postgres:14
# auth:
# username: mydockerhub-user
# password: $DOCKERHUB_PASSWORD
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
# Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
steps:

View File

@@ -29,9 +29,6 @@ healthcheck_timeout = 100
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# Stats will be sent here
statsd_address = "127.0.0.1:8125"
#
# User to use for authentication against the server.
[user]

View File

@@ -12,7 +12,7 @@ function start_pgcat() {
}
# Setup the database with shards and user
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
@@ -72,7 +72,7 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
# Start PgCat in debug to demonstrate failover better
start_pgcat "debug"
start_pgcat "trace"
# Add latency to the replica at port 5433 slightly above the healthcheck timeout
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica

93
Cargo.lock generated
View File

@@ -45,6 +45,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bb8"
version = "0.7.1"
@@ -109,22 +115,23 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.1.1"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0"
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "digest"
version = "0.10.1"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [
"block-buffer",
"crypto-common",
"generic-array",
"subtle",
]
[[package]]
@@ -205,6 +212,15 @@ dependencies = [
"libc",
]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]]
name = "humantime"
version = "2.1.0"
@@ -352,14 +368,16 @@ dependencies = [
[[package]]
name = "pgcat"
version = "0.1.0"
version = "0.1.0-beta2"
dependencies = [
"arc-swap",
"async-trait",
"base64",
"bb8",
"bytes",
"chrono",
"env_logger",
"hmac",
"log",
"md-5",
"num_cpus",
@@ -370,7 +388,9 @@ dependencies = [
"serde",
"serde_derive",
"sha-1",
"sha2",
"sqlparser",
"stringprep",
"tokio",
"toml",
]
@@ -462,9 +482,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
@@ -511,6 +531,17 @@ dependencies = [
"digest",
]
[[package]]
name = "sha2"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@@ -541,6 +572,22 @@ dependencies = [
"log",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.86"
@@ -572,6 +619,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.16.1"
@@ -617,6 +679,21 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-normalization"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-xid"
version = "0.2.2"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "0.1.0"
version = "0.2.0-beta1"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -25,3 +25,7 @@ log = "0.4"
arc-swap = "1"
env_logger = "0.9"
parking_lot = "0.11"
hmac = "0.12"
sha2 = "0.10"
base64 = "0.13"
stringprep = "0.1"

View File

@@ -8,4 +8,4 @@ COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
ENTRYPOINT ["/usr/bin/pgcat"]
CMD ["pgcat"]

View File

@@ -4,9 +4,9 @@
![PgCat](./pgcat3.png)
Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover support.
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
**Alpha**: looking for alpha testers, see [#35](https://github.com/levkk/pgcat/issues/35).
**Beta**: looking for beta testers, see [#35](https://github.com/levkk/pgcat/issues/35).
## Features
| **Feature** | **Status** | **Comments** |
@@ -18,14 +18,23 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
| Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :white_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
| Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. |
| Statistics | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. |
| Live configuration reloading | :white_check_mark: | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
| Admin database | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. |
## Deployment
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance.
That setting can be adjusted to spawn as many (or as little) workers as needed.
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. That setting can be adjusted to spawn as many (or as little) workers as needed.
For quick local example, use the Docker Compose environment provided:
```bash
docker-compose up
# In a new terminal:
psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
```
### Config
@@ -39,7 +48,6 @@ That setting can be adjusted to spawn as many (or as little) workers as needed.
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| `statsd_address` | StatsD host and port. Statistics will be sent there every 15 seconds. | `127.0.0.1:8125` |
| | | |
| **`user`** | | |
| `name` | The user name. | `sharding_user` |
@@ -82,7 +90,7 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing.
| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
| Statistics | :white_check_mark: | :white_check_mark: | Query the admin database with `psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS'`. |
| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
## Usage
@@ -225,11 +233,15 @@ SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts unt
### Statistics reporting
Stats are reported using StatsD every 15 seconds. The address is configurable with `statsd_address`, the default is `127.0.0.1:8125`. The stats are very similar to what Pgbouncer reports and the names are kept to be comparable.
The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
```
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
```
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process. Not all settings are currently supported by live reload:
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. Not all settings are currently supported by live reload:
| **Config** | **Requires restart** |
|-------------------------|----------------------|
@@ -239,7 +251,6 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process. Not all
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `ban_time` | no |
| `statsd_address` | yes |
| `user` | yes |
| `shards` | yes |
| `default_role` | no |

16
docker-compose.yml Normal file
View File

@@ -0,0 +1,16 @@
version: "3"
services:
postgres:
image: postgres:13
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: md5
pgcat:
build: .
command:
- "pgcat"
- "/etc/pgcat/pgcat.toml"
volumes:
- "${PWD}/examples/docker/pgcat.toml:/etc/pgcat/pgcat.toml"
ports:
- "6432:6432"

105
examples/docker/pgcat.toml Normal file
View File

@@ -0,0 +1,105 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# How many connections to allocate per server.
pool_size = 15
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# How long to wait before aborting a server connection (ms).
connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
#
# User to use for authentication against the server.
[user]
name = "postgres"
password = "postgres"
#
# Shards in the cluster
[shards]
# Shard 0
[shards.0]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
# Database name (e.g. "postgres")
database = "postgres"
[shards.1]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "postgres"
[shards.2]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "postgres"
# Settings for our query routing layer.
[query_router]
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = false
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"

View File

@@ -29,9 +29,6 @@ healthcheck_timeout = 1000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# Stats will be sent here
statsd_address = "127.0.0.1:8125"
#
# User to use for authentication against the server.
[user]

View File

@@ -1,8 +1,8 @@
/// Admin database.
use bytes::{Buf, BufMut, BytesMut};
use log::{info, trace};
use tokio::net::tcp::OwnedWriteHalf;
use std::collections::HashMap;
use tokio::net::tcp::OwnedWriteHalf;
use crate::config::{get_config, parse};
use crate::errors::Error;
@@ -10,7 +10,7 @@ use crate::messages::*;
use crate::pool::ConnectionPool;
use crate::stats::get_stats;
/// Handle admin client
/// Handle admin client.
pub async fn handle_admin(
stream: &mut OwnedWriteHalf,
mut query: BytesMut,
@@ -58,7 +58,7 @@ pub async fn handle_admin(
}
}
/// SHOW LISTS
/// Column-oriented statistics.
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats();
@@ -125,7 +125,7 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
write_all_half(stream, res).await
}
/// SHOW VERSION
/// Show PgCat version.
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new();
@@ -140,7 +140,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await
}
/// SHOW POOLS
/// Show utilization of connection pools for each shard and replicas.
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats();
let config = {
@@ -189,6 +189,7 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
@@ -196,7 +197,7 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
write_all_half(stream, res).await
}
/// SHOW DATABASES
/// Show shards and replicas.
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
@@ -221,7 +222,6 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R
let mut res = BytesMut::new();
// RowDescription
res.put(row_description(&columns));
for shard in 0..pool.shards() {
@@ -265,7 +265,7 @@ async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
custom_protocol_response_ok(stream, "SET").await
}
/// RELOAD
/// Reload the configuration file without restarting the process.
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
info!("Reloading config");
@@ -280,7 +280,6 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new();
// CommandComplete
res.put(command_complete("RELOAD"));
// ReadyForQuery
@@ -291,13 +290,14 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await
}
/// Shows current configuration.
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let guard = get_config();
let config = &*guard.clone();
let config: HashMap<String, String> = config.into();
drop(guard);
// Configs that cannot be changed dynamically.
// Configs that cannot be changed without restarting.
let immutables = ["host", "port", "connect_timeout"];
// Columns
@@ -327,6 +327,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
@@ -334,7 +335,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await
}
/// SHOW STATS
/// Show shard and replicas statistics.
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let columns = vec![
("database", DataType::Text),
@@ -378,6 +379,7 @@ async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

View File

@@ -1,16 +1,13 @@
/// Implementation of the PostgreSQL client.
/// We are pretending to the server in this scenario,
/// and this module implements that.
/// Handle clients by pretending to be a PostgreSQL server.
use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, trace};
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};
use std::collections::HashMap;
use crate::admin::handle_admin;
use crate::config::get_config;
use crate::constants::*;
@@ -23,53 +20,52 @@ use crate::stats::Reporter;
/// The client state. One of these is created per client.
pub struct Client {
// The reads are buffered (8K by default).
/// The reads are buffered (8K by default).
read: BufReader<OwnedReadHalf>,
// We buffer the writes ourselves because we know the protocol
// better than a stock buffer.
/// We buffer the writes ourselves because we know the protocol
/// better than a stock buffer.
write: OwnedWriteHalf,
// Internal buffer, where we place messages until we have to flush
// them to the backend.
/// Internal buffer, where we place messages until we have to flush
/// them to the backend.
buffer: BytesMut,
// The client was started with the sole reason to cancel another running query.
/// The client was started with the sole reason to cancel another running query.
cancel_mode: bool,
// In transaction mode, the connection is released after each transaction.
// Session mode has slightly higher throughput per client, but lower capacity.
/// In transaction mode, the connection is released after each transaction.
/// Session mode has slightly higher throughput per client, but lower capacity.
transaction_mode: bool,
// For query cancellation, the client is given a random process ID and secret on startup.
/// For query cancellation, the client is given a random process ID and secret on startup.
process_id: i32,
secret_key: i32,
// Clients are mapped to servers while they use them. This allows a client
// to connect and cancel a query.
/// Clients are mapped to servers while they use them. This allows a client
/// to connect and cancel a query.
client_server_map: ClientServerMap,
// Client parameters, e.g. user, client_encoding, etc.
/// Client parameters, e.g. user, client_encoding, etc.
#[allow(dead_code)]
parameters: HashMap<String, String>,
// Statistics
/// Statistics
stats: Reporter,
// Clients want to talk to admin
/// Clients want to talk to admin database.
admin: bool,
// Last address the client talked to
/// Last address the client talked to.
last_address_id: Option<usize>,
// Last server process id we talked to
/// Last server process id we talked to.
last_server_id: Option<i32>,
}
impl Client {
/// Given a TCP socket, trick the client into thinking we are
/// the Postgres server. Perform the authentication and place
/// the client in query-ready mode.
/// Perform client startup sequence.
/// See docs: <https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.3>
pub async fn startup(
mut stream: TcpStream,
client_server_map: ClientServerMap,
@@ -82,14 +78,12 @@ impl Client {
loop {
trace!("Waiting for StartupMessage");
// Could be StartupMessage or SSLRequest
// which makes this variable length.
// Could be StartupMessage, SSLRequest or CancelRequest.
let len = match stream.read_i32().await {
Ok(len) => len,
Err(_) => return Err(Error::ClientBadStartup),
};
// Read whatever is left.
let mut startup = vec![0u8; len as usize - 4];
match stream.read_exact(&mut startup).await {
@@ -189,7 +183,7 @@ impl Client {
}
}
/// Client loop. We handle all messages between the client and the database here.
/// Handle a connected and authenticated client.
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
@@ -225,14 +219,14 @@ impl Client {
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocols.
// or issue commands for our sharding and server selection protocol.
loop {
trace!("Client idle, waiting for message");
// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool,
// in case the client is sending some control messages, e.g.
// in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?;
@@ -242,43 +236,48 @@ impl Client {
return Ok(());
}
// Handle admin database real quick
// Handle admin database queries.
if self.admin {
trace!("Handling admin command");
handle_admin(&mut self.write, message, pool.clone()).await?;
continue;
}
// Handle all custom protocol commands here.
// Handle all custom protocol commands, if any.
match query_router.try_execute_command(message.clone()) {
// Normal query
// Normal query, not a custom command.
None => {
// Attempt to infer which server we want to query, i.e. primary or replica.
if query_router.query_parser_enabled() && query_router.role() == None {
query_router.infer_role(message.clone());
}
}
// SET SHARD TO
Some((Command::SetShard, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?;
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
continue;
}
// SET SHARDING KEY TO
Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARDING KEY"))
.await?;
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
continue;
}
// SET SERVER ROLE TO
Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue;
}
// SHOW SERVER ROLE
Some((Command::ShowServerRole, value)) => {
show_response(&mut self.write, "server role", &value).await?;
continue;
}
// SHOW SHARD
Some((Command::ShowShard, value)) => {
show_response(&mut self.write, "shard", &value).await?;
continue;
@@ -290,7 +289,7 @@ impl Client {
error_response(
&mut self.write,
&format!(
"shard '{}' is more than configured '{}'",
"shard {} is more than configured {}",
query_router.shard(),
pool.shards()
),
@@ -301,7 +300,7 @@ impl Client {
debug!("Waiting for connection from pool");
// Grab a server from the pool: the client issued a regular query.
// Grab a server from the pool.
let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id)
.await
@@ -322,18 +321,18 @@ impl Client {
let address = connection.1;
let server = &mut *reference;
// Claim this server as mine for query cancellation.
// Server is assigned to the client in case the client wants to
// cancel a query later.
server.claim(self.process_id, self.secret_key);
// "disconnect" from the previous server stats-wise
// Update statistics.
if let Some(last_address_id) = self.last_address_id {
self.stats
.client_disconnecting(self.process_id, last_address_id);
}
// Client active & server active
self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id());
@@ -343,9 +342,20 @@ impl Client {
server.address()
);
// Set application_name if any.
// TODO: investigate other parameters and set them too.
if self.parameters.contains_key("application_name") {
server
.set_name(&self.parameters["application_name"])
.await?;
}
// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode.
//
// If the client is in session mode, no more custom protocol
// commands will be accepted.
loop {
let mut message = if message.len() == 0 {
trace!("Waiting for message inside transaction or in session mode");
@@ -353,11 +363,13 @@ impl Client {
match read_message(&mut self.read).await {
Ok(message) => message,
Err(err) => {
// Client disconnected without warning.
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
// This prevents connection thrashing by bad clients.
if server.in_transaction() {
// Client left dirty server. Clean up and proceed
// without thrashing this connection.
server.query("ROLLBACK; DISCARD ALL;").await?;
server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
}
return Err(err);
@@ -383,13 +395,11 @@ impl Client {
'Q' => {
debug!("Sending query to server");
// TODO: implement retries here for read-only transactions.
server.send(original).await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
// TODO: implement retries here for read-only transactions.
let response = server.recv().await?;
// Send server reply to the client.
@@ -409,7 +419,6 @@ impl Client {
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
// The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id);
@@ -429,11 +438,14 @@ impl Client {
// connection before releasing into the pool.
// Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave.
// This pool will protect the database. :salute:
if server.in_transaction() {
server.query("ROLLBACK; DISCARD ALL;").await?;
server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
}
self.release();
return Ok(());
}
@@ -468,7 +480,6 @@ impl Client {
self.buffer.put(&original[..]);
// TODO: retries for read-only transactions.
server.send(self.buffer.clone()).await?;
self.buffer.clear();
@@ -476,7 +487,6 @@ impl Client {
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
// TODO: retries for read-only transactions
let response = server.recv().await?;
match write_all_half(&mut self.write, response).await {
@@ -495,11 +505,11 @@ impl Client {
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
@@ -529,11 +539,11 @@ impl Client {
}
};
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break;
@@ -556,7 +566,7 @@ impl Client {
}
}
/// Release the server from being mine. I can't cancel its queries anymore.
/// Release the server from the client: it can't cancel its queries anymore.
pub fn release(&self) {
let mut guard = self.client_server_map.lock();
guard.remove(&(self.process_id, self.secret_key));
@@ -565,11 +575,10 @@ impl Client {
impl Drop for Client {
fn drop(&mut self) {
// Disconnect the client
// Update statistics.
if let Some(address_id) = self.last_address_id {
self.stats.client_disconnecting(self.process_id, address_id);
// The server is now idle
if let Some(process_id) = self.last_server_id {
self.stats.server_idle(process_id, address_id);
}

View File

@@ -1,18 +1,20 @@
/// Parse the configuration file.
use arc_swap::{ArcSwap, Guard};
use log::{error, info};
use once_cell::sync::Lazy;
use serde_derive::Deserialize;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use toml;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::errors::Error;
/// Globally available configuration.
static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config::default()));
/// Server role: primary or replica.
#[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
pub enum Role {
Primary,
@@ -46,6 +48,7 @@ impl PartialEq<Role> for Option<Role> {
}
}
/// Address identifying a PostgreSQL server uniquely.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
pub struct Address {
pub id: usize,
@@ -70,6 +73,7 @@ impl Default for Address {
}
impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String {
match self.role {
Role::Primary => format!("shard_{}_primary", self.shard),
@@ -79,6 +83,7 @@ impl Address {
}
}
/// PostgreSQL user.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
pub struct User {
pub name: String,
@@ -94,6 +99,7 @@ impl Default for User {
}
}
/// General configuration.
#[derive(Deserialize, Debug, Clone)]
pub struct General {
pub host: String,
@@ -103,7 +109,6 @@ pub struct General {
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub ban_time: i64,
pub statsd_address: String,
}
impl Default for General {
@@ -116,11 +121,11 @@ impl Default for General {
connect_timeout: 5000,
healthcheck_timeout: 1000,
ban_time: 60,
statsd_address: String::from("127.0.0.1:8125"),
}
}
}
/// Shard configuration.
#[derive(Deserialize, Debug, Clone)]
pub struct Shard {
pub servers: Vec<(String, u16, String)>,
@@ -136,6 +141,7 @@ impl Default for Shard {
}
}
/// Query Router configuration.
#[derive(Deserialize, Debug, Clone)]
pub struct QueryRouter {
pub default_role: String,
@@ -155,6 +161,7 @@ impl Default for QueryRouter {
}
}
/// Configuration wrapper.
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub path: Option<String>,
@@ -198,10 +205,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
config.general.healthcheck_timeout.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
(
"statsd_address".to_string(),
config.general.statsd_address.to_string(),
),
(
"default_role".to_string(),
config.query_router.default_role.to_string(),
@@ -223,6 +226,7 @@ impl From<&Config> for std::collections::HashMap<String, String> {
}
impl Config {
/// Print current configuration.
pub fn show(&self) {
info!("Pool size: {}", self.general.pool_size);
info!("Pool mode: {}", self.general.pool_mode);
@@ -237,11 +241,14 @@ impl Config {
}
}
/// Get a read-only instance of the configuration
/// from anywhere in the app.
/// ArcSwap makes this cheap and quick.
pub fn get_config() -> Guard<Arc<Config>> {
CONFIG.load()
}
/// Parse the config.
/// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();
let mut file = match File::open(path).await {
@@ -352,6 +359,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
config.path = Some(path.to_string());
// Update the configuration globally.
CONFIG.store(Arc::new(config.clone()));
Ok(())

View File

@@ -1,7 +1,6 @@
/// Various protocol constants, as defined in
/// https://www.postgresql.org/docs/12/protocol-message-formats.html
/// <https://www.postgresql.org/docs/12/protocol-message-formats.html>
/// and elsewhere in the source code.
/// Also other constants we use elsewhere.
// Used in the StartupMessage to indicate regular handshake.
pub const PROTOCOL_VERSION_NUMBER: i32 = 196608;
@@ -15,6 +14,13 @@ pub const CANCEL_REQUEST_CODE: i32 = 80877102;
// AuthenticationMD5Password
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
// SASL
pub const SASL: i32 = 10;
pub const SASL_CONTINUE: i32 = 11;
pub const SASL_FINAL: i32 = 12;
pub const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
pub const NONCE_LENGTH: usize = 24;
// AuthenticationOk
pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;

View File

@@ -1,12 +1,12 @@
/// Errors.
/// Various errors.
#[derive(Debug, PartialEq)]
pub enum Error {
SocketError,
// ClientDisconnected,
ClientBadStartup,
ProtocolSyncError,
ServerError,
// ServerTimeout,
// DirtyServer,
BadConfig,
AllServersDown,
}

View File

@@ -54,23 +54,20 @@ mod errors;
mod messages;
mod pool;
mod query_router;
mod scram;
mod server;
mod sharding;
mod stats;
// Support for query cancellation: this maps our process_ids and
// secret keys to the backend's.
use config::get_config;
use pool::{ClientServerMap, ConnectionPool};
use stats::{Collector, Reporter};
/// Main!
#[tokio::main(worker_threads = 4)]
async fn main() {
env_logger::init();
info!("Welcome to PgCat! Meow.");
// Prepare regexes
if !query_router::QueryRouter::setup() {
error!("Could not setup query router");
return;
@@ -84,7 +81,6 @@ async fn main() {
String::from("pgcat.toml")
};
// Prepare the config
match config::parse(&config_file).await {
Ok(_) => (),
Err(err) => {
@@ -94,8 +90,8 @@ async fn main() {
};
let config = get_config();
let addr = format!("{}:{}", config.general.host, config.general.port);
let listener = match TcpListener::bind(&addr).await {
Ok(sock) => sock,
Err(err) => {
@@ -105,18 +101,20 @@ async fn main() {
};
info!("Running on {}", addr);
config.show();
// Tracks which client is connected to which server for query cancellation.
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Collect statistics and send them to StatsD
// Statistics reporting.
let (tx, rx) = mpsc::channel(100);
// Connection pool for all shards and replicas
// Connection pool that allows to query all shards and replicas.
let mut pool =
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
// Statistics collector task.
let collector_tx = tx.clone();
let addresses = pool.databases();
tokio::task::spawn(async move {
@@ -135,7 +133,7 @@ async fn main() {
info!("Waiting for clients");
// Main app runs here.
// Client connection loop.
tokio::task::spawn(async move {
loop {
let pool = pool.clone();
@@ -151,7 +149,7 @@ async fn main() {
}
};
// Client goes to another thread, bye.
// Handle client.
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();
match client::Client::startup(socket, client_server_map, server_info, reporter)
@@ -185,7 +183,7 @@ async fn main() {
}
});
// Reload config
// Reload config:
// kill -SIGHUP $(pgrep pgcat)
tokio::task::spawn(async move {
let mut stream = unix_signal(SignalKind::hangup()).unwrap();
@@ -205,16 +203,15 @@ async fn main() {
}
});
// Setup shut down sequence
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutting down...");
}
// Exit on Ctrl-C (SIGINT) and SIGTERM.
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
}
tokio::select! {
_ = signal::ctrl_c() => (),
_ = term_signal.recv() => (),
};
info!("Shutting down...");
}
/// Format chrono::Duration to be more human-friendly.

View File

@@ -222,7 +222,7 @@ pub async fn custom_protocol_response_ok(
/// Send a custom error message to the client.
/// Tell the client we are ready for the next query and no rollback is necessary.
/// Docs on error codes: https://www.postgresql.org/docs/12/errcodes-appendix.html
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> {
let mut error = BytesMut::new();
@@ -339,6 +339,7 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
res
}
/// Create a DataRow message.
pub fn data_row(row: &Vec<String>) -> BytesMut {
let mut res = BytesMut::new();
let mut data_row = BytesMut::new();
@@ -358,6 +359,7 @@ pub fn data_row(row: &Vec<String>) -> BytesMut {
res
}
/// Create a CommandComplete message.
pub fn command_complete(command: &str) -> BytesMut {
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
let mut res = BytesMut::new();

View File

@@ -1,24 +1,23 @@
/// Pooling and failover and banlist.
/// Pooling, failover and banlist.
use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection};
use bytes::BytesMut;
use chrono::naive::NaiveDateTime;
use log::{debug, error, info, warn};
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use crate::config::{get_config, Address, Role, User};
use crate::errors::Error;
use crate::server::Server;
use crate::stats::Reporter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
// Banlist: bad servers go in here.
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
/// The globally accessible connection pool.
#[derive(Clone, Debug)]
pub struct ConnectionPool {
databases: Vec<Vec<Pool<ServerPool>>>,
@@ -29,7 +28,7 @@ pub struct ConnectionPool {
}
impl ConnectionPool {
/// Construct the connection pool from a config file.
/// Construct the connection pool from the configuration.
pub async fn from_config(
client_server_map: ClientServerMap,
stats: Reporter,
@@ -204,15 +203,12 @@ impl ConnectionPool {
}
while allowed_attempts > 0 {
// Round-robin each client's queries.
// If a client only sends one query and then disconnects, it doesn't matter
// which replica it'll go to.
// Round-robin replicas.
self.round_robin += 1;
let index = self.round_robin % addresses.len();
let address = &addresses[index];
self.stats.client_waiting(process_id, address.id);
// Make sure you're getting a primary or a replica
// as per request. If no specific role is requested, the first
// available will be chosen.
@@ -226,6 +222,9 @@ impl ConnectionPool {
continue;
}
// Indicate we're waiting on a server connection from a pool.
self.stats.client_waiting(process_id, address.id);
// Check if we can connect
let mut conn = match self.databases[shard][index].get().await {
Ok(conn) => conn,
@@ -239,7 +238,7 @@ impl ConnectionPool {
}
};
// // Check if this server is alive with a health check
// // Check if this server is alive with a health check.
let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout;
@@ -251,7 +250,7 @@ impl ConnectionPool {
)
.await
{
// Check if health check succeeded
// Check if health check succeeded.
Ok(res) => match res {
Ok(_) => {
self.stats
@@ -259,8 +258,11 @@ impl ConnectionPool {
self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}
// Health check failed.
Err(_) => {
error!("Banning replica {} because of failed health check", index);
// Don't leave a bad connection in the pool.
server.mark_bad();
@@ -271,7 +273,8 @@ impl ConnectionPool {
continue;
}
},
// Health check never came back, database is really really down
// Health check timed out.
Err(_) => {
error!("Banning replica {} because of health check timeout", index);
// Don't leave a bad connection in the pool.
@@ -358,14 +361,18 @@ impl ConnectionPool {
}
}
/// Get the number of configured shards.
pub fn shards(&self) -> usize {
self.databases.len()
}
/// Get the number of servers (primary and replicas)
/// configured for a shard.
pub fn servers(&self, shard: usize) -> usize {
self.addresses[shard].len()
}
/// Get the total number of servers (databases) we are connected to.
pub fn databases(&self) -> usize {
let mut databases = 0;
for shard in 0..self.shards() {
@@ -374,15 +381,18 @@ impl ConnectionPool {
databases
}
/// Get pool state for a particular shard server as reported by bb8.
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
self.databases[shard][server].state()
}
/// Get the address information for a shard server.
pub fn address(&self, shard: usize, server: usize) -> &Address {
&self.addresses[shard][server]
}
}
/// Wrapper for the bb8 connection pool.
pub struct ServerPool {
address: Address,
user: User,
@@ -427,6 +437,7 @@ impl ManageConnection for ServerPool {
let process_id = rand::random::<i32>();
self.stats.server_login(process_id, self.address.id);
// Connect to the PostgreSQL server.
match Server::startup(
&self.address,
&self.user,

View File

@@ -1,5 +1,3 @@
use crate::config::{get_config, Role};
use crate::sharding::{Sharder, ShardingFunction};
/// Route queries automatically based on explicitely requested
/// or implied query characteristics.
use bytes::{Buf, BytesMut};
@@ -10,6 +8,10 @@ use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use crate::config::{get_config, Role};
use crate::sharding::{Sharder, ShardingFunction};
/// Regexes used to parse custom commands.
const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
@@ -18,6 +20,7 @@ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)^ *SHOW SERVER ROLE *;? *$",
];
/// Custom commands.
#[derive(PartialEq, Debug)]
pub enum Command {
SetShardingKey,
@@ -27,37 +30,39 @@ pub enum Command {
ShowServerRole,
}
// Quick test
/// Quickly test for match when a query is received.
static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Capture value
// Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
/// The query router.
pub struct QueryRouter {
// By default, queries go here, unless we have better information
// about what the client wants.
/// By default, queries go here, unless we have better information
/// about what the client wants.
default_server_role: Option<Role>,
// Number of shards in the cluster.
/// Number of shards in the cluster.
shards: usize,
// Which shard we should be talking to right now.
/// Which shard we should be talking to right now.
active_shard: Option<usize>,
// Should we be talking to a primary or a replica?
/// Which server should we be talking to.
active_role: Option<Role>,
// Include the primary into the replica pool?
/// Include the primary into the replica pool for reads.
primary_reads_enabled: bool,
// Should we try to parse queries?
/// Should we try to parse queries to route them to replicas or primary automatically.
query_parser_enabled: bool,
// Which sharding function are we using?
/// Which sharding function we're using.
sharding_function: ShardingFunction,
}
impl QueryRouter {
/// One-time initialization of regexes.
pub fn setup() -> bool {
let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
Ok(rgx) => rgx,
@@ -88,6 +93,7 @@ impl QueryRouter {
}
}
/// Create a new instance of the query router. Each client gets its own.
pub fn new() -> QueryRouter {
let config = get_config();
@@ -120,6 +126,7 @@ impl QueryRouter {
pub fn try_execute_command(&mut self, mut buf: BytesMut) -> Option<(Command, String)> {
let code = buf.get_u8() as char;
// Only simple protocol supported for commands.
if code != 'Q' {
return None;
}
@@ -158,8 +165,7 @@ impl QueryRouter {
// figured out a better way just yet. I think I can write a single Regex
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
//
// I think this is faster than running the Regex engine 5 times, so
// this is a strong maybe for me so far.
// I think this is faster than running the Regex engine 5 times.
match regex_list[matches[0]].captures(&query) {
Some(captures) => match captures.get(1) {
Some(value) => value.as_str().to_string(),
@@ -221,7 +227,6 @@ impl QueryRouter {
}
"default" => {
// TODO: reset query parser to default here.
self.active_role = self.default_server_role;
self.query_parser_enabled = get_config().query_router.query_parser_enabled;
self.active_role
@@ -243,12 +248,14 @@ impl QueryRouter {
let len = buf.get_i32() as usize;
let query = match code {
// Query
'Q' => {
let query = String::from_utf8_lossy(&buf[..len - 5]).to_string();
debug!("Query: '{}'", query);
query
}
// Parse (prepared statement)
'P' => {
let mut start = 0;
let mut end;
@@ -271,6 +278,7 @@ impl QueryRouter {
query.replace("$", "") // Remove placeholders turning them into "values"
}
_ => return false,
};
@@ -334,6 +342,7 @@ impl QueryRouter {
self.query_parser_enabled
}
/// Allows to toggle primary reads in tests.
#[allow(dead_code)]
pub fn toggle_primary_reads(&mut self, value: bool) {
self.primary_reads_enabled = value;

311
src/scram.rs Normal file
View File

@@ -0,0 +1,311 @@
// SCRAM authentication...largely copy/pasted from
// https://github.com/sfackler/rust-postgres/.
use bytes::BytesMut;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use std::fmt::Write;
use crate::constants::*;
use crate::errors::Error;
fn normalize(pass: &[u8]) -> Vec<u8> {
let pass = match std::str::from_utf8(pass) {
Ok(pass) => pass,
Err(_) => return pass.to_vec(),
};
match stringprep::saslprep(pass) {
Ok(pass) => pass.into_owned().into_bytes(),
Err(_) => pass.as_bytes().to_vec(),
}
}
pub struct ScramSha256 {
password: String,
salted_password: [u8; 32],
auth_message: String,
message: BytesMut,
nonce: String,
}
impl ScramSha256 {
pub fn new(password: &str) -> ScramSha256 {
let mut rng = rand::thread_rng();
let nonce = (0..NONCE_LENGTH)
.map(|_| {
let mut v = rng.gen_range(0x21u8..0x7e);
if v == 0x2c {
v = 0x7e
}
v as char
})
.collect::<String>();
Self::from_nonce(password, &nonce)
}
pub fn from_nonce(password: &str, nonce: &str) -> ScramSha256 {
let message = BytesMut::from(&format!("{}n=,r={}", "n,,", nonce).as_bytes()[..]);
ScramSha256 {
password: password.to_string(),
nonce: String::from(nonce),
message,
salted_password: [0u8; 32],
auth_message: String::new(),
}
}
pub fn message(&mut self) -> BytesMut {
self.message.clone()
}
pub fn update(&mut self, message: &BytesMut) -> Result<BytesMut, Error> {
let server_message = Message::parse(message)?;
if !server_message.nonce.starts_with(&self.nonce) {
// trace!("Bad server nonce");
return Err(Error::ProtocolSyncError);
}
let salt = match base64::decode(&server_message.salt) {
Ok(salt) => salt,
Err(_) => return Err(Error::ProtocolSyncError),
};
let salted_password = Self::hi(
&normalize(&self.password.as_bytes()[..]),
&salt,
server_message.iterations,
);
self.salted_password = salted_password;
let mut hmac = Hmac::<Sha256>::new_from_slice(&salted_password)
.expect("HMAC is able to accept all key sizes");
hmac.update(b"Client Key");
let client_key = hmac.finalize().into_bytes();
let mut hash = Sha256::default();
hash.update(client_key.as_slice());
let stored_key = hash.finalize_fixed();
let mut cbind_input = vec![];
cbind_input.extend("n,,".as_bytes());
let cbind_input = base64::encode(&cbind_input);
self.message.clear();
write!(
&mut self.message,
"c={},r={}",
cbind_input, server_message.nonce
)
.unwrap();
let auth_message = format!(
"n=,r={},{},{}",
self.nonce,
String::from_utf8_lossy(&message[..]),
String::from_utf8_lossy(&self.message[..])
);
let mut hmac = Hmac::<Sha256>::new_from_slice(&stored_key)
.expect("HMAC is able to accept all key sizes");
hmac.update(auth_message.as_bytes());
let client_signature = hmac.finalize().into_bytes();
let mut client_proof = client_key;
for (proof, signature) in client_proof.iter_mut().zip(client_signature) {
*proof ^= signature;
}
write!(&mut self.message, ",p={}", base64::encode(&*client_proof)).unwrap();
self.auth_message = auth_message;
Ok(self.message.clone())
}
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
let final_message = FinalMessage::parse(message)?;
let verifier = match base64::decode(&final_message.value) {
Ok(verifier) => verifier,
Err(_) => return Err(Error::ProtocolSyncError),
};
let mut hmac = Hmac::<Sha256>::new_from_slice(&self.salted_password)
.expect("HMAC is able to accept all key sizes");
hmac.update(b"Server Key");
let server_key = hmac.finalize().into_bytes();
let mut hmac = Hmac::<Sha256>::new_from_slice(&server_key)
.expect("HMAC is able to accept all key sizes");
hmac.update(self.auth_message.as_bytes());
match hmac.verify_slice(&verifier) {
Ok(_) => Ok(()),
Err(_) => return Err(Error::ServerError),
}
}
// https://github.com/sfackler/rust-postgres/blob/c3a029e60c1c0bd0be947049859b8fa5bd5ac220/postgres-protocol/src/authentication/sasl.rs#L35
fn hi(str: &[u8], salt: &[u8], i: u32) -> [u8; 32] {
let mut hmac =
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
hmac.update(salt);
hmac.update(&[0, 0, 0, 1]);
let mut prev = hmac.finalize().into_bytes();
let mut hi = prev;
for _ in 1..i {
let mut hmac = Hmac::<Sha256>::new_from_slice(str).expect("already checked above");
hmac.update(&prev);
prev = hmac.finalize().into_bytes();
for (hi, prev) in hi.iter_mut().zip(prev) {
*hi ^= prev;
}
}
hi.into()
}
}
#[derive(Default, Debug)]
struct Message {
nonce: String,
salt: String,
iterations: u32,
}
impl Message {
fn parse(message: &BytesMut) -> Result<Message, Error> {
if !message.starts_with(b"r=") {
return Err(Error::ProtocolSyncError);
}
let mut i = 2;
while message[i] != b',' && i < message.len() {
i += 1;
}
let nonce = String::from_utf8_lossy(&message[2..i]).to_string();
// Skip the ,
i += 1;
if !&message[i..].starts_with(b"s=") {
return Err(Error::ProtocolSyncError);
}
// Skip the s=
i += 2;
let s = i;
while message[i] != b',' && i < message.len() {
i += 1;
}
let salt = String::from_utf8_lossy(&message[s..i]).to_string();
// Skip the ,
i += 1;
if !&message[i..].starts_with(b"i=") {
return Err(Error::ProtocolSyncError);
}
i += 2;
let iterations = match String::from_utf8_lossy(&message[i..]).parse::<u32>() {
Ok(it) => it,
Err(_) => return Err(Error::ProtocolSyncError),
};
Ok(Message {
nonce,
salt,
iterations,
})
}
}
struct FinalMessage {
value: String,
}
impl FinalMessage {
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
if !message.starts_with(b"v=") {
return Err(Error::ProtocolSyncError);
}
Ok(FinalMessage {
value: String::from_utf8_lossy(&message[2..]).to_string(),
})
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse_server_first_message() {
let message = BytesMut::from(
&"r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096".as_bytes()[..],
);
let message = Message::parse(&message).unwrap();
assert_eq!(message.nonce, "fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j");
assert_eq!(message.salt, "QSXCR+Q6sek8bf92");
assert_eq!(message.iterations, 4096);
}
#[test]
fn parse_server_last_message() {
let f = FinalMessage::parse(&BytesMut::from(
&"v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".as_bytes()[..],
))
.unwrap();
assert_eq!(
f.value,
"U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".to_string()
);
}
// recorded auth exchange from psql
#[test]
fn exchange() {
let password = "foobar";
let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB";
let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB";
let server_first =
"r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\
=4096";
let client_final =
"c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\
1NTlQYNs5BTeQjdHdk7lOflDo5re2an8=";
let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw=";
let mut scram = ScramSha256::from_nonce(password, nonce);
let message = scram.message();
assert_eq!(std::str::from_utf8(&message).unwrap(), client_first);
let result = scram
.update(&BytesMut::from(&server_first.as_bytes()[..]))
.unwrap();
assert_eq!(std::str::from_utf8(&result).unwrap(), client_final);
scram
.finish(&BytesMut::from(&server_final.as_bytes()[..]))
.unwrap();
}
}

View File

@@ -1,6 +1,6 @@
/// Implementation of the PostgreSQL server (database) protocol.
/// Here we are pretending to the a Postgres client.
use bytes::{Buf, BufMut, BytesMut};
///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client.
use log::{debug, error, info, trace};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
@@ -12,48 +12,52 @@ use crate::config::{Address, User};
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::scram::ScramSha256;
use crate::stats::Reporter;
use crate::ClientServerMap;
/// Server state.
pub struct Server {
// Server host, e.g. localhost,
// port, e.g. 5432, and role, e.g. primary or replica.
/// Server host, e.g. localhost,
/// port, e.g. 5432, and role, e.g. primary or replica.
address: Address,
// Buffered read socket.
/// Buffered read socket.
read: BufReader<OwnedReadHalf>,
// Unbuffered write socket (our client code buffers).
/// Unbuffered write socket (our client code buffers).
write: OwnedWriteHalf,
// Our server response buffer. We buffer data before we give it to the client.
/// Our server response buffer. We buffer data before we give it to the client.
buffer: BytesMut,
// Server information the server sent us over on startup.
/// Server information the server sent us over on startup.
server_info: BytesMut,
// Backend id and secret key used for query cancellation.
/// Backend id and secret key used for query cancellation.
process_id: i32,
secret_key: i32,
// Is the server inside a transaction or idle.
/// Is the server inside a transaction or idle.
in_transaction: bool,
// Is there more data for the client to read.
/// Is there more data for the client to read.
data_available: bool,
// Is the server broken? We'll remote it from the pool if so.
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
// Mapping of clients and servers used for query cancellation.
/// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap,
// Server connected at.
/// Server connected at.
connected_at: chrono::naive::NaiveDateTime,
// Reports various metrics, e.g. data sent & received.
/// Reports various metrics, e.g. data sent & received.
stats: Reporter,
/// Application name using the server at the moment.
application_name: String,
}
impl Server {
@@ -77,7 +81,7 @@ impl Server {
trace!("Sending StartupMessage");
// Send the startup packet telling the server we're a normal Postgres client.
// StartupMessage
startup(&mut stream, &user.name, database).await?;
let mut server_info = BytesMut::new();
@@ -86,6 +90,8 @@ impl Server {
// We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete.
let mut scram = ScramSha256::new(&user.password);
loop {
let code = match stream.read_u8().await {
Ok(code) => code as char,
@@ -127,6 +133,83 @@ impl Server {
AUTHENTICATION_SUCCESSFUL => (),
SASL => {
debug!("Starting SASL authentication");
let sasl_len = (len - 8) as usize;
let mut sasl_auth = vec![0u8; sasl_len];
match stream.read_exact(&mut sasl_auth).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
if sasl_type == SCRAM_SHA_256 {
debug!("Using {}", SCRAM_SHA_256);
// Send client message
let sasl_response = scram.message();
let mut res = BytesMut::new();
res.put_u8(b'p');
res.put_i32(
4 + SCRAM_SHA_256.len() as i32
+ 1
+ sasl_response.len() as i32
+ 4,
);
res.put_slice(&format!("{}\0", SCRAM_SHA_256).as_bytes()[..]);
res.put_i32(sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
} else {
error!("Unsupported SCRAM version: {}", sasl_type);
return Err(Error::ServerError);
}
}
SASL_CONTINUE => {
trace!("Continuing SASL");
let mut sasl_data = vec![0u8; (len - 8) as usize];
match stream.read_exact(&mut sasl_data).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
let msg = BytesMut::from(&sasl_data[..]);
let sasl_response = scram.update(&msg)?;
let mut res = BytesMut::new();
res.put_u8(b'p');
res.put_i32(4 + sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
}
SASL_FINAL => {
trace!("Final SASL");
let mut sasl_final = vec![0u8; len as usize - 8];
match stream.read_exact(&mut sasl_final).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
match scram.finish(&BytesMut::from(&sasl_final[..])) {
Ok(_) => {
debug!("SASL authentication successful");
}
Err(err) => {
debug!("SASL authentication failed");
return Err(err);
}
};
}
_ => {
error!("Unsupported authentication mechanism: {}", auth_code);
return Err(Error::ServerError);
@@ -187,7 +270,7 @@ impl Server {
// BackendKeyData
'K' => {
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html
// See: <https://www.postgresql.org/docs/12/protocol-message-formats.html>.
process_id = match stream.read_i32().await {
Ok(id) => id,
Err(_) => return Err(Error::SocketError),
@@ -208,11 +291,9 @@ impl Server {
Err(_) => return Err(Error::SocketError),
};
// This is the last step in the client-server connection setup,
// and indicates the server is ready for to query it.
let (read, write) = stream.into_split();
return Ok(Server {
let mut server = Server {
address: address.clone(),
read: BufReader::new(read),
write: write,
@@ -226,7 +307,12 @@ impl Server {
client_server_map: client_server_map,
connected_at: chrono::offset::Utc::now().naive_utc(),
stats: stats,
});
application_name: String::new(),
};
server.set_name("pgcat").await?;
return Ok(server);
}
// We have an unexpected message from the server during this exchange.
@@ -342,8 +428,7 @@ impl Server {
// More data is available after this message, this is not the end of the reply.
self.data_available = true;
// Don't flush yet, the more we buffer, the faster this goes...
// up to a limit of course.
// Don't flush yet, the more we buffer, the faster this goes...up to a limit.
if self.buffer.len() >= 8196 {
break;
}
@@ -411,7 +496,7 @@ impl Server {
/// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self) {
error!("Server marked bad");
error!("Server {:?} marked bad", self.address);
self.bad = true;
}
@@ -451,9 +536,14 @@ impl Server {
/// A shorthand for `SET application_name = $1`.
#[allow(dead_code)]
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?)
if self.application_name != name {
self.application_name = name.to_string();
Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?)
} else {
Ok(())
}
}
/// Get the servers address.
@@ -462,6 +552,7 @@ impl Server {
self.address.clone()
}
/// Get the server's unique identifier.
pub fn process_id(&self) -> i32 {
self.process_id
}
@@ -481,9 +572,10 @@ impl Drop for Server {
match self.write.try_write(&bytes) {
Ok(_) => (),
Err(_) => (),
Err(_) => debug!("Dirty shutdown"),
};
// Should not matter.
self.bad = true;
let now = chrono::offset::Utc::now().naive_utc();

View File

@@ -1,20 +1,27 @@
/// Implements various sharding functions.
use sha1::{Digest, Sha1};
// https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20
/// See: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20>.
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
/// The sharding functions we support.
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum ShardingFunction {
PgBigintHash,
Sha1,
}
/// The sharder.
pub struct Sharder {
/// Number of shards in the cluster.
shards: usize,
/// The sharding function in use.
sharding_function: ShardingFunction,
}
impl Sharder {
/// Create new instance of the sharder.
pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
Sharder {
shards,
@@ -22,6 +29,7 @@ impl Sharder {
}
}
/// Compute the shard given sharding key.
pub fn shard(&self, key: i64) -> usize {
match self.sharding_function {
ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
@@ -31,7 +39,7 @@ impl Sharder {
/// Hash function used by Postgres to determine which partition
/// to put the row in when using HASH(column) partitioning.
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631
/// Source: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631>.
/// Supports only 1 bigint at the moment, but we can add more later.
fn pg_bigint_hash(&self, key: i64) -> usize {
let mut lohalf = key as u32;
@@ -119,6 +127,7 @@ impl Sharder {
a
}
#[inline]
fn pg_u32_hash(k: u32) -> u64 {
let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::<u32>() as u32 + 3923095 as u32;
let mut b = a;

View File

@@ -5,12 +5,12 @@ use parking_lot::Mutex;
use std::collections::HashMap;
use tokio::sync::mpsc::{Receiver, Sender};
// Latest stats updated every second; used in SHOW STATS and other admin commands.
/// Latest stats updated every second; used in SHOW STATS and other admin commands.
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
// Statistics period used for average calculations.
// 15 seconds.
/// Statistics period used for average calculations.
/// 15 seconds.
static STAT_PERIOD: u64 = 15000;
/// The names for the events reported

View File

@@ -2,3 +2,4 @@ source "https://rubygems.org"
gem "pg"
gem "activerecord"
gem "rubocop"

View File

@@ -11,13 +11,33 @@ GEM
i18n (>= 1.6, < 2)
minitest (>= 5.1)
tzinfo (~> 2.0)
ast (2.4.2)
concurrent-ruby (1.1.9)
i18n (1.10.0)
concurrent-ruby (~> 1.0)
minitest (5.15.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
pg (1.3.2)
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.2.5)
rubocop (1.29.0)
parallel (~> 1.10)
parser (>= 3.1.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.8, < 3.0)
rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.17.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.17.0)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)
PLATFORMS
x86_64-linux
@@ -25,6 +45,7 @@ PLATFORMS
DEPENDENCIES
activerecord
pg
rubocop
BUNDLED WITH
2.3.7

View File

@@ -1,6 +1,9 @@
# frozen_string_literal: true
require 'active_record'
require 'pg'
$stdout.sync = true
# Uncomment these two to see all queries.
# ActiveRecord.verbose_query_logs = true
@@ -13,6 +16,7 @@ ActiveRecord::Base.establish_connection(
username: 'sharding_user',
password: 'sharding_user',
database: 'rails_dev',
application_name: 'testing_pgcat',
prepared_statements: false, # Transaction mode
advisory_locks: false # Same
)
@@ -110,3 +114,17 @@ begin
rescue ActiveRecord::StatementInvalid
puts 'OK'
end
# Test evil clients
def poorly_behaved_client
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/rails_dev?application_name=testing_pgcat")
conn.async_exec 'BEGIN'
conn.async_exec 'SELECT 1'
conn.close
puts 'Bad client ok'
end
25.times do
poorly_behaved_client
end