Compare commits

..

14 Commits

Author SHA1 Message Date
Lev Kokotov
d412238f47 Implement SCRAM-SHA-256 for server authentication (PG14) (#76)
* Implement SCRAM-SHA-256

* test it

* trace

* move to community for auth

* hmm
2022-06-18 18:36:00 -07:00
dependabot[bot]
7782933f59 Bump regex from 1.5.4 to 1.5.5 (#75)
Bumps [regex](https://github.com/rust-lang/regex) from 1.5.4 to 1.5.5.
- [Release notes](https://github.com/rust-lang/regex/releases)
- [Changelog](https://github.com/rust-lang/regex/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/regex/compare/1.5.4...1.5.5)

---
updated-dependencies:
- dependency-name: regex
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-06-06 19:59:50 -07:00
Lev Kokotov
bac4e1f52c Only set application_name if it's different (#74)
* Only set application_name if it's different

* keep server named pgcat until something else changes
2022-06-05 09:48:06 -07:00
Lev Kokotov
37e3a86881 Pass application_name to server (#73)
* Pass application_name to server

* fmt
2022-06-03 00:15:50 -07:00
Lev Kokotov
61db13f614 Fix memory leak in client/server mapping (#71) 2022-05-18 16:24:03 -07:00
Lev Kokotov
fe32b5ef17 Reduce traffic on the stats channel (#69) 2022-05-17 13:05:25 -07:00
Lev Kokotov
54699222f8 Possible fix for clients waiting stat leak (#68) 2022-05-14 21:35:33 -07:00
Lev Kokotov
ccbca66e7a Poorly behaved client fix (#65)
* Poorly behaved client fix

* yes officer

* fix tests

* no useless rescue

* Looks ok
2022-05-09 09:09:22 -07:00
Lev Kokotov
df85139281 Update README. Comments. Version bump. (#60)
* update readme

* comments

* just a version bump
2022-03-10 01:33:29 -08:00
Lev Kokotov
509e4815a3 Update README.md 2022-03-08 17:48:26 -08:00
Lev Kokotov
5338ff2323 Update README.md 2022-03-08 17:46:46 -08:00
Lev Kokotov
1ea0a7f332 Update README.md 2022-03-08 17:45:54 -08:00
Lev Kokotov
d1b86d363d Update README.md 2022-03-08 17:38:51 -08:00
Lev Kokotov
b309ead58f Handle SIGTERM. Add docker-compose.yml (#59)
* docker-compsoe

* remove statsd config

* readme
2022-03-08 17:18:48 -08:00
26 changed files with 908 additions and 203 deletions

View File

@@ -12,13 +12,15 @@ jobs:
- image: cimg/rust:1.58.1 - image: cimg/rust:1.58.1
environment: environment:
RUST_LOG: info RUST_LOG: info
- image: cimg/postgres:14.0 - image: postgres:14
auth: # auth:
username: mydockerhub-user # username: mydockerhub-user
password: $DOCKERHUB_PASSWORD # password: $DOCKERHUB_PASSWORD
environment: environment:
POSTGRES_USER: postgres POSTGRES_USER: postgres
POSTGRES_DB: postgres POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
# Add steps to the job # Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps # See: https://circleci.com/docs/2.0/configuration-reference/#steps
steps: steps:

View File

@@ -29,9 +29,6 @@ healthcheck_timeout = 100
# For how long to ban a server if it fails a health check (seconds). # For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds ban_time = 60 # Seconds
# Stats will be sent here
statsd_address = "127.0.0.1:8125"
# #
# User to use for authentication against the server. # User to use for authentication against the server.
[user] [user]

View File

@@ -12,7 +12,7 @@ function start_pgcat() {
} }
# Setup the database with shards and user # Setup the database with shards and user
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
@@ -72,7 +72,7 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null) (! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
# Start PgCat in debug to demonstrate failover better # Start PgCat in debug to demonstrate failover better
start_pgcat "debug" start_pgcat "trace"
# Add latency to the replica at port 5433 slightly above the healthcheck timeout # Add latency to the replica at port 5433 slightly above the healthcheck timeout
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica

93
Cargo.lock generated
View File

@@ -45,6 +45,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]] [[package]]
name = "bb8" name = "bb8"
version = "0.7.1" version = "0.7.1"
@@ -109,22 +115,23 @@ dependencies = [
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.1" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0" checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
dependencies = [ dependencies = [
"generic-array", "generic-array",
"typenum",
] ]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.1" version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b" checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"crypto-common", "crypto-common",
"generic-array", "subtle",
] ]
[[package]] [[package]]
@@ -205,6 +212,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]] [[package]]
name = "humantime" name = "humantime"
version = "2.1.0" version = "2.1.0"
@@ -352,14 +368,16 @@ dependencies = [
[[package]] [[package]]
name = "pgcat" name = "pgcat"
version = "0.1.0" version = "0.1.0-beta2"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"base64",
"bb8", "bb8",
"bytes", "bytes",
"chrono", "chrono",
"env_logger", "env_logger",
"hmac",
"log", "log",
"md-5", "md-5",
"num_cpus", "num_cpus",
@@ -370,7 +388,9 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"sha-1", "sha-1",
"sha2",
"sqlparser", "sqlparser",
"stringprep",
"tokio", "tokio",
"toml", "toml",
] ]
@@ -462,9 +482,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.4" version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@@ -511,6 +531,17 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "sha2"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.0" version = "1.4.0"
@@ -541,6 +572,22 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.86"
@@ -572,6 +619,21 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.16.1" version = "1.16.1"
@@ -617,6 +679,21 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-normalization"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
dependencies = [
"tinyvec",
]
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.2" version = "0.2.2"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "pgcat" name = "pgcat"
version = "0.1.0" version = "0.2.0-beta1"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -25,3 +25,7 @@ log = "0.4"
arc-swap = "1" arc-swap = "1"
env_logger = "0.9" env_logger = "0.9"
parking_lot = "0.11" parking_lot = "0.11"
hmac = "0.12"
sha2 = "0.10"
base64 = "0.13"
stringprep = "0.1"

View File

@@ -8,4 +8,4 @@ COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat WORKDIR /etc/pgcat
ENV RUST_LOG=info ENV RUST_LOG=info
ENTRYPOINT ["/usr/bin/pgcat"] CMD ["pgcat"]

View File

@@ -4,9 +4,9 @@
![PgCat](./pgcat3.png) ![PgCat](./pgcat3.png)
Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover support. PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
**Alpha**: looking for alpha testers, see [#35](https://github.com/levkk/pgcat/issues/35). **Beta**: looking for beta testers, see [#35](https://github.com/levkk/pgcat/issues/35).
## Features ## Features
| **Feature** | **Status** | **Comments** | | **Feature** | **Status** | **Comments** |
@@ -18,14 +18,23 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
| Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | | Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | | Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | | Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :white_check_mark: | Statistics similar to PgBouncers are reported via StatsD. | | Statistics | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. |
| Live configuration reloading | :construction_worker: | Reload config with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)`. Not all settings can be reloaded without a restart. | | Live configuration reloading | :white_check_mark: | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. | | Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
| Admin database | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. |
## Deployment ## Deployment
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. That setting can be adjusted to spawn as many (or as little) workers as needed.
That setting can be adjusted to spawn as many (or as little) workers as needed.
For quick local example, use the Docker Compose environment provided:
```bash
docker-compose up
# In a new terminal:
psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
```
### Config ### Config
@@ -39,7 +48,6 @@ That setting can be adjusted to spawn as many (or as little) workers as needed.
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` | | `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` | | `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` | | `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| `statsd_address` | StatsD host and port. Statistics will be sent there every 15 seconds. | `127.0.0.1:8125` |
| | | | | | | |
| **`user`** | | | | **`user`** | | |
| `name` | The user name. | `sharding_user` | | `name` | The user name. | `sharding_user` |
@@ -82,7 +90,7 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing.
| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. | | Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. | | Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | | Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | | Statistics | :white_check_mark: | :white_check_mark: | Query the admin database with `psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS'`. |
| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. | | Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
## Usage ## Usage
@@ -225,11 +233,15 @@ SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts unt
### Statistics reporting ### Statistics reporting
Stats are reported using StatsD every 15 seconds. The address is configurable with `statsd_address`, the default is `127.0.0.1:8125`. The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
```
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
```
### Live configuration reloading ### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process. Not all settings are currently supported by live reload: The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. Not all settings are currently supported by live reload:
| **Config** | **Requires restart** | | **Config** | **Requires restart** |
|-------------------------|----------------------| |-------------------------|----------------------|
@@ -239,7 +251,6 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process. Not all
| `connect_timeout` | yes | | `connect_timeout` | yes |
| `healthcheck_timeout` | no | | `healthcheck_timeout` | no |
| `ban_time` | no | | `ban_time` | no |
| `statsd_address` | yes |
| `user` | yes | | `user` | yes |
| `shards` | yes | | `shards` | yes |
| `default_role` | no | | `default_role` | no |

16
docker-compose.yml Normal file
View File

@@ -0,0 +1,16 @@
version: "3"
services:
postgres:
image: postgres:13
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: md5
pgcat:
build: .
command:
- "pgcat"
- "/etc/pgcat/pgcat.toml"
volumes:
- "${PWD}/examples/docker/pgcat.toml:/etc/pgcat/pgcat.toml"
ports:
- "6432:6432"

105
examples/docker/pgcat.toml Normal file
View File

@@ -0,0 +1,105 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# How many connections to allocate per server.
pool_size = 15
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# How long to wait before aborting a server connection (ms).
connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
#
# User to use for authentication against the server.
[user]
name = "postgres"
password = "postgres"
#
# Shards in the cluster
[shards]
# Shard 0
[shards.0]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
# Database name (e.g. "postgres")
database = "postgres"
[shards.1]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "postgres"
[shards.2]
# [ host, port, role ]
servers = [
[ "postgres", 5432, "primary" ],
[ "postgres", 5432, "replica" ],
# [ "127.0.1.1", 5432, "replica" ],
]
database = "postgres"
# Settings for our query routing layer.
[query_router]
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = false
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"

View File

@@ -29,9 +29,6 @@ healthcheck_timeout = 1000
# For how long to ban a server if it fails a health check (seconds). # For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds ban_time = 60 # Seconds
# Stats will be sent here
statsd_address = "127.0.0.1:8125"
# #
# User to use for authentication against the server. # User to use for authentication against the server.
[user] [user]

View File

@@ -1,8 +1,8 @@
/// Admin database.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::{info, trace}; use log::{info, trace};
use tokio::net::tcp::OwnedWriteHalf;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::tcp::OwnedWriteHalf;
use crate::config::{get_config, parse}; use crate::config::{get_config, parse};
use crate::errors::Error; use crate::errors::Error;
@@ -10,7 +10,7 @@ use crate::messages::*;
use crate::pool::ConnectionPool; use crate::pool::ConnectionPool;
use crate::stats::get_stats; use crate::stats::get_stats;
/// Handle admin client /// Handle admin client.
pub async fn handle_admin( pub async fn handle_admin(
stream: &mut OwnedWriteHalf, stream: &mut OwnedWriteHalf,
mut query: BytesMut, mut query: BytesMut,
@@ -58,7 +58,7 @@ pub async fn handle_admin(
} }
} }
/// SHOW LISTS /// Column-oriented statistics.
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats(); let stats = get_stats();
@@ -125,7 +125,7 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
write_all_half(stream, res).await write_all_half(stream, res).await
} }
/// SHOW VERSION /// Show PgCat version.
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new(); let mut res = BytesMut::new();
@@ -140,7 +140,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await write_all_half(stream, res).await
} }
/// SHOW POOLS /// Show utilization of connection pools for each shard and replicas.
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let stats = get_stats(); let stats = get_stats();
let config = { let config = {
@@ -189,6 +189,7 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
res.put(command_complete("SHOW")); res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z'); res.put_u8(b'Z');
res.put_i32(5); res.put_i32(5);
res.put_u8(b'I'); res.put_u8(b'I');
@@ -196,7 +197,7 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
write_all_half(stream, res).await write_all_half(stream, res).await
} }
/// SHOW DATABASES /// Show shards and replicas.
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let guard = get_config(); let guard = get_config();
let config = &*guard.clone(); let config = &*guard.clone();
@@ -221,7 +222,6 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R
let mut res = BytesMut::new(); let mut res = BytesMut::new();
// RowDescription
res.put(row_description(&columns)); res.put(row_description(&columns));
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
@@ -265,7 +265,7 @@ async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
custom_protocol_response_ok(stream, "SET").await custom_protocol_response_ok(stream, "SET").await
} }
/// RELOAD /// Reload the configuration file without restarting the process.
async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> { async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
info!("Reloading config"); info!("Reloading config");
@@ -280,7 +280,6 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let mut res = BytesMut::new(); let mut res = BytesMut::new();
// CommandComplete
res.put(command_complete("RELOAD")); res.put(command_complete("RELOAD"));
// ReadyForQuery // ReadyForQuery
@@ -291,13 +290,14 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await write_all_half(stream, res).await
} }
/// Shows current configuration.
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
let guard = get_config(); let guard = get_config();
let config = &*guard.clone(); let config = &*guard.clone();
let config: HashMap<String, String> = config.into(); let config: HashMap<String, String> = config.into();
drop(guard); drop(guard);
// Configs that cannot be changed dynamically. // Configs that cannot be changed without restarting.
let immutables = ["host", "port", "connect_timeout"]; let immutables = ["host", "port", "connect_timeout"];
// Columns // Columns
@@ -327,6 +327,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
res.put(command_complete("SHOW")); res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z'); res.put_u8(b'Z');
res.put_i32(5); res.put_i32(5);
res.put_u8(b'I'); res.put_u8(b'I');
@@ -334,7 +335,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
write_all_half(stream, res).await write_all_half(stream, res).await
} }
/// SHOW STATS /// Show shard and replicas statistics.
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
let columns = vec![ let columns = vec![
("database", DataType::Text), ("database", DataType::Text),
@@ -378,6 +379,7 @@ async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
res.put(command_complete("SHOW")); res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z'); res.put_u8(b'Z');
res.put_i32(5); res.put_i32(5);
res.put_u8(b'I'); res.put_u8(b'I');

View File

@@ -1,16 +1,13 @@
/// Implementation of the PostgreSQL client. /// Handle clients by pretending to be a PostgreSQL server.
/// We are pretending to the server in this scenario,
/// and this module implements that.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, trace}; use log::{debug, error, trace};
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream, TcpStream,
}; };
use std::collections::HashMap;
use crate::admin::handle_admin; use crate::admin::handle_admin;
use crate::config::get_config; use crate::config::get_config;
use crate::constants::*; use crate::constants::*;
@@ -23,53 +20,52 @@ use crate::stats::Reporter;
/// The client state. One of these is created per client. /// The client state. One of these is created per client.
pub struct Client { pub struct Client {
// The reads are buffered (8K by default). /// The reads are buffered (8K by default).
read: BufReader<OwnedReadHalf>, read: BufReader<OwnedReadHalf>,
// We buffer the writes ourselves because we know the protocol /// We buffer the writes ourselves because we know the protocol
// better than a stock buffer. /// better than a stock buffer.
write: OwnedWriteHalf, write: OwnedWriteHalf,
// Internal buffer, where we place messages until we have to flush /// Internal buffer, where we place messages until we have to flush
// them to the backend. /// them to the backend.
buffer: BytesMut, buffer: BytesMut,
// The client was started with the sole reason to cancel another running query. /// The client was started with the sole reason to cancel another running query.
cancel_mode: bool, cancel_mode: bool,
// In transaction mode, the connection is released after each transaction. /// In transaction mode, the connection is released after each transaction.
// Session mode has slightly higher throughput per client, but lower capacity. /// Session mode has slightly higher throughput per client, but lower capacity.
transaction_mode: bool, transaction_mode: bool,
// For query cancellation, the client is given a random process ID and secret on startup. /// For query cancellation, the client is given a random process ID and secret on startup.
process_id: i32, process_id: i32,
secret_key: i32, secret_key: i32,
// Clients are mapped to servers while they use them. This allows a client /// Clients are mapped to servers while they use them. This allows a client
// to connect and cancel a query. /// to connect and cancel a query.
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
// Client parameters, e.g. user, client_encoding, etc. /// Client parameters, e.g. user, client_encoding, etc.
#[allow(dead_code)] #[allow(dead_code)]
parameters: HashMap<String, String>, parameters: HashMap<String, String>,
// Statistics /// Statistics
stats: Reporter, stats: Reporter,
// Clients want to talk to admin /// Clients want to talk to admin database.
admin: bool, admin: bool,
// Last address the client talked to /// Last address the client talked to.
last_address_id: Option<usize>, last_address_id: Option<usize>,
// Last server process id we talked to /// Last server process id we talked to.
last_server_id: Option<i32>, last_server_id: Option<i32>,
} }
impl Client { impl Client {
/// Given a TCP socket, trick the client into thinking we are /// Perform client startup sequence.
/// the Postgres server. Perform the authentication and place /// See docs: <https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.3>
/// the client in query-ready mode.
pub async fn startup( pub async fn startup(
mut stream: TcpStream, mut stream: TcpStream,
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
@@ -82,14 +78,12 @@ impl Client {
loop { loop {
trace!("Waiting for StartupMessage"); trace!("Waiting for StartupMessage");
// Could be StartupMessage or SSLRequest // Could be StartupMessage, SSLRequest or CancelRequest.
// which makes this variable length.
let len = match stream.read_i32().await { let len = match stream.read_i32().await {
Ok(len) => len, Ok(len) => len,
Err(_) => return Err(Error::ClientBadStartup), Err(_) => return Err(Error::ClientBadStartup),
}; };
// Read whatever is left.
let mut startup = vec![0u8; len as usize - 4]; let mut startup = vec![0u8; len as usize - 4];
match stream.read_exact(&mut startup).await { match stream.read_exact(&mut startup).await {
@@ -189,7 +183,7 @@ impl Client {
} }
} }
/// Client loop. We handle all messages between the client and the database here. /// Handle a connected and authenticated client.
pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> { pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously. // The client wants to cancel a query it has issued previously.
if self.cancel_mode { if self.cancel_mode {
@@ -225,14 +219,14 @@ impl Client {
// Our custom protocol loop. // Our custom protocol loop.
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocols. // or issue commands for our sharding and server selection protocol.
loop { loop {
trace!("Client idle, waiting for message"); trace!("Client idle, waiting for message");
// Read a complete message from the client, which normally would be // Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol). // either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool, // We can parse it here before grabbing a server from the pool,
// in case the client is sending some control messages, e.g. // in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint'; // SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?; let mut message = read_message(&mut self.read).await?;
@@ -242,43 +236,48 @@ impl Client {
return Ok(()); return Ok(());
} }
// Handle admin database real quick // Handle admin database queries.
if self.admin { if self.admin {
trace!("Handling admin command"); trace!("Handling admin command");
handle_admin(&mut self.write, message, pool.clone()).await?; handle_admin(&mut self.write, message, pool.clone()).await?;
continue; continue;
} }
// Handle all custom protocol commands here. // Handle all custom protocol commands, if any.
match query_router.try_execute_command(message.clone()) { match query_router.try_execute_command(message.clone()) {
// Normal query // Normal query, not a custom command.
None => { None => {
// Attempt to infer which server we want to query, i.e. primary or replica.
if query_router.query_parser_enabled() && query_router.role() == None { if query_router.query_parser_enabled() && query_router.role() == None {
query_router.infer_role(message.clone()); query_router.infer_role(message.clone());
} }
} }
// SET SHARD TO
Some((Command::SetShard, _)) => { Some((Command::SetShard, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?; custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
continue; continue;
} }
// SET SHARDING KEY TO
Some((Command::SetShardingKey, _)) => { Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARDING KEY")) custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
.await?;
continue; continue;
} }
// SET SERVER ROLE TO
Some((Command::SetServerRole, _)) => { Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue; continue;
} }
// SHOW SERVER ROLE
Some((Command::ShowServerRole, value)) => { Some((Command::ShowServerRole, value)) => {
show_response(&mut self.write, "server role", &value).await?; show_response(&mut self.write, "server role", &value).await?;
continue; continue;
} }
// SHOW SHARD
Some((Command::ShowShard, value)) => { Some((Command::ShowShard, value)) => {
show_response(&mut self.write, "shard", &value).await?; show_response(&mut self.write, "shard", &value).await?;
continue; continue;
@@ -290,7 +289,7 @@ impl Client {
error_response( error_response(
&mut self.write, &mut self.write,
&format!( &format!(
"shard '{}' is more than configured '{}'", "shard {} is more than configured {}",
query_router.shard(), query_router.shard(),
pool.shards() pool.shards()
), ),
@@ -301,7 +300,7 @@ impl Client {
debug!("Waiting for connection from pool"); debug!("Waiting for connection from pool");
// Grab a server from the pool: the client issued a regular query. // Grab a server from the pool.
let connection = match pool let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id) .get(query_router.shard(), query_router.role(), self.process_id)
.await .await
@@ -322,18 +321,18 @@ impl Client {
let address = connection.1; let address = connection.1;
let server = &mut *reference; let server = &mut *reference;
// Claim this server as mine for query cancellation. // Server is assigned to the client in case the client wants to
// cancel a query later.
server.claim(self.process_id, self.secret_key); server.claim(self.process_id, self.secret_key);
// "disconnect" from the previous server stats-wise // Update statistics.
if let Some(last_address_id) = self.last_address_id { if let Some(last_address_id) = self.last_address_id {
self.stats self.stats
.client_disconnecting(self.process_id, last_address_id); .client_disconnecting(self.process_id, last_address_id);
} }
// Client active & server active
self.stats.client_active(self.process_id, address.id); self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id); self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id); self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id()); self.last_server_id = Some(server.process_id());
@@ -343,9 +342,20 @@ impl Client {
server.address() server.address()
); );
// Set application_name if any.
// TODO: investigate other parameters and set them too.
if self.parameters.contains_key("application_name") {
server
.set_name(&self.parameters["application_name"])
.await?;
}
// Transaction loop. Multiple queries can be issued by the client here. // Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over, // The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode. // or until the client disconnects if we are in session mode.
//
// If the client is in session mode, no more custom protocol
// commands will be accepted.
loop { loop {
let mut message = if message.len() == 0 { let mut message = if message.len() == 0 {
trace!("Waiting for message inside transaction or in session mode"); trace!("Waiting for message inside transaction or in session mode");
@@ -353,11 +363,13 @@ impl Client {
match read_message(&mut self.read).await { match read_message(&mut self.read).await {
Ok(message) => message, Ok(message) => message,
Err(err) => { Err(err) => {
// Client disconnected without warning. // Client disconnected inside a transaction.
// Clean up the server and re-use it.
// This prevents connection thrashing by bad clients.
if server.in_transaction() { if server.in_transaction() {
// Client left dirty server. Clean up and proceed server.query("ROLLBACK").await?;
// without thrashing this connection. server.query("DISCARD ALL").await?;
server.query("ROLLBACK; DISCARD ALL;").await?; server.set_name("pgcat").await?;
} }
return Err(err); return Err(err);
@@ -383,13 +395,11 @@ impl Client {
'Q' => { 'Q' => {
debug!("Sending query to server"); debug!("Sending query to server");
// TODO: implement retries here for read-only transactions.
server.send(original).await?; server.send(original).await?;
// Read all data the server has to offer, which can be multiple messages // Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks. // buffered in 8196 bytes chunks.
loop { loop {
// TODO: implement retries here for read-only transactions.
let response = server.recv().await?; let response = server.recv().await?;
// Send server reply to the client. // Send server reply to the client.
@@ -409,7 +419,6 @@ impl Client {
// Report query executed statistics. // Report query executed statistics.
self.stats.query(self.process_id, address.id); self.stats.query(self.process_id, address.id);
// The transaction is over, we can release the connection back to the pool.
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
@@ -429,11 +438,14 @@ impl Client {
// connection before releasing into the pool. // connection before releasing into the pool.
// Pgbouncer closes the connection which leads to // Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave. // connection thrashing when clients misbehave.
// This pool will protect the database. :salute:
if server.in_transaction() { if server.in_transaction() {
server.query("ROLLBACK; DISCARD ALL;").await?; server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
} }
self.release();
return Ok(()); return Ok(());
} }
@@ -468,7 +480,6 @@ impl Client {
self.buffer.put(&original[..]); self.buffer.put(&original[..]);
// TODO: retries for read-only transactions.
server.send(self.buffer.clone()).await?; server.send(self.buffer.clone()).await?;
self.buffer.clear(); self.buffer.clear();
@@ -476,7 +487,6 @@ impl Client {
// Read all data the server has to offer, which can be multiple messages // Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks. // buffered in 8196 bytes chunks.
loop { loop {
// TODO: retries for read-only transactions
let response = server.recv().await?; let response = server.recv().await?;
match write_all_half(&mut self.write, response).await { match write_all_half(&mut self.write, response).await {
@@ -495,11 +505,11 @@ impl Client {
// Report query executed statistics. // Report query executed statistics.
self.stats.query(self.process_id, address.id); self.stats.query(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id); self.stats.server_idle(server.process_id(), address.id);
break; break;
@@ -529,11 +539,11 @@ impl Client {
} }
}; };
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id); self.stats.server_idle(server.process_id(), address.id);
break; break;
@@ -556,7 +566,7 @@ impl Client {
} }
} }
/// Release the server from being mine. I can't cancel its queries anymore. /// Release the server from the client: it can't cancel its queries anymore.
pub fn release(&self) { pub fn release(&self) {
let mut guard = self.client_server_map.lock(); let mut guard = self.client_server_map.lock();
guard.remove(&(self.process_id, self.secret_key)); guard.remove(&(self.process_id, self.secret_key));
@@ -565,11 +575,10 @@ impl Client {
impl Drop for Client { impl Drop for Client {
fn drop(&mut self) { fn drop(&mut self) {
// Disconnect the client // Update statistics.
if let Some(address_id) = self.last_address_id { if let Some(address_id) = self.last_address_id {
self.stats.client_disconnecting(self.process_id, address_id); self.stats.client_disconnecting(self.process_id, address_id);
// The server is now idle
if let Some(process_id) = self.last_server_id { if let Some(process_id) = self.last_server_id {
self.stats.server_idle(process_id, address_id); self.stats.server_idle(process_id, address_id);
} }

View File

@@ -1,18 +1,20 @@
/// Parse the configuration file.
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use log::{error, info}; use log::{error, info};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use toml; use toml;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::errors::Error; use crate::errors::Error;
/// Globally available configuration.
static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config::default())); static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config::default()));
/// Server role: primary or replica.
#[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)] #[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
pub enum Role { pub enum Role {
Primary, Primary,
@@ -46,6 +48,7 @@ impl PartialEq<Role> for Option<Role> {
} }
} }
/// Address identifying a PostgreSQL server uniquely.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
pub struct Address { pub struct Address {
pub id: usize, pub id: usize,
@@ -70,6 +73,7 @@ impl Default for Address {
} }
impl Address { impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String { pub fn name(&self) -> String {
match self.role { match self.role {
Role::Primary => format!("shard_{}_primary", self.shard), Role::Primary => format!("shard_{}_primary", self.shard),
@@ -79,6 +83,7 @@ impl Address {
} }
} }
/// PostgreSQL user.
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)] #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)]
pub struct User { pub struct User {
pub name: String, pub name: String,
@@ -94,6 +99,7 @@ impl Default for User {
} }
} }
/// General configuration.
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct General { pub struct General {
pub host: String, pub host: String,
@@ -103,7 +109,6 @@ pub struct General {
pub connect_timeout: u64, pub connect_timeout: u64,
pub healthcheck_timeout: u64, pub healthcheck_timeout: u64,
pub ban_time: i64, pub ban_time: i64,
pub statsd_address: String,
} }
impl Default for General { impl Default for General {
@@ -116,11 +121,11 @@ impl Default for General {
connect_timeout: 5000, connect_timeout: 5000,
healthcheck_timeout: 1000, healthcheck_timeout: 1000,
ban_time: 60, ban_time: 60,
statsd_address: String::from("127.0.0.1:8125"),
} }
} }
} }
/// Shard configuration.
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Shard { pub struct Shard {
pub servers: Vec<(String, u16, String)>, pub servers: Vec<(String, u16, String)>,
@@ -136,6 +141,7 @@ impl Default for Shard {
} }
} }
/// Query Router configuration.
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct QueryRouter { pub struct QueryRouter {
pub default_role: String, pub default_role: String,
@@ -155,6 +161,7 @@ impl Default for QueryRouter {
} }
} }
/// Configuration wrapper.
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub path: Option<String>, pub path: Option<String>,
@@ -198,10 +205,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
config.general.healthcheck_timeout.to_string(), config.general.healthcheck_timeout.to_string(),
), ),
("ban_time".to_string(), config.general.ban_time.to_string()), ("ban_time".to_string(), config.general.ban_time.to_string()),
(
"statsd_address".to_string(),
config.general.statsd_address.to_string(),
),
( (
"default_role".to_string(), "default_role".to_string(),
config.query_router.default_role.to_string(), config.query_router.default_role.to_string(),
@@ -223,6 +226,7 @@ impl From<&Config> for std::collections::HashMap<String, String> {
} }
impl Config { impl Config {
/// Print current configuration.
pub fn show(&self) { pub fn show(&self) {
info!("Pool size: {}", self.general.pool_size); info!("Pool size: {}", self.general.pool_size);
info!("Pool mode: {}", self.general.pool_mode); info!("Pool mode: {}", self.general.pool_mode);
@@ -237,11 +241,14 @@ impl Config {
} }
} }
/// Get a read-only instance of the configuration
/// from anywhere in the app.
/// ArcSwap makes this cheap and quick.
pub fn get_config() -> Guard<Arc<Config>> { pub fn get_config() -> Guard<Arc<Config>> {
CONFIG.load() CONFIG.load()
} }
/// Parse the config. /// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> { pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new(); let mut contents = String::new();
let mut file = match File::open(path).await { let mut file = match File::open(path).await {
@@ -352,6 +359,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
config.path = Some(path.to_string()); config.path = Some(path.to_string());
// Update the configuration globally.
CONFIG.store(Arc::new(config.clone())); CONFIG.store(Arc::new(config.clone()));
Ok(()) Ok(())

View File

@@ -1,7 +1,6 @@
/// Various protocol constants, as defined in /// Various protocol constants, as defined in
/// https://www.postgresql.org/docs/12/protocol-message-formats.html /// <https://www.postgresql.org/docs/12/protocol-message-formats.html>
/// and elsewhere in the source code. /// and elsewhere in the source code.
/// Also other constants we use elsewhere.
// Used in the StartupMessage to indicate regular handshake. // Used in the StartupMessage to indicate regular handshake.
pub const PROTOCOL_VERSION_NUMBER: i32 = 196608; pub const PROTOCOL_VERSION_NUMBER: i32 = 196608;
@@ -15,6 +14,13 @@ pub const CANCEL_REQUEST_CODE: i32 = 80877102;
// AuthenticationMD5Password // AuthenticationMD5Password
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5; pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
// SASL
pub const SASL: i32 = 10;
pub const SASL_CONTINUE: i32 = 11;
pub const SASL_FINAL: i32 = 12;
pub const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
pub const NONCE_LENGTH: usize = 24;
// AuthenticationOk // AuthenticationOk
pub const AUTHENTICATION_SUCCESSFUL: i32 = 0; pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;

View File

@@ -1,12 +1,12 @@
/// Errors.
/// Various errors.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
SocketError, SocketError,
// ClientDisconnected,
ClientBadStartup, ClientBadStartup,
ProtocolSyncError, ProtocolSyncError,
ServerError, ServerError,
// ServerTimeout,
// DirtyServer,
BadConfig, BadConfig,
AllServersDown, AllServersDown,
} }

View File

@@ -54,23 +54,20 @@ mod errors;
mod messages; mod messages;
mod pool; mod pool;
mod query_router; mod query_router;
mod scram;
mod server; mod server;
mod sharding; mod sharding;
mod stats; mod stats;
// Support for query cancellation: this maps our process_ids and
// secret keys to the backend's.
use config::get_config; use config::get_config;
use pool::{ClientServerMap, ConnectionPool}; use pool::{ClientServerMap, ConnectionPool};
use stats::{Collector, Reporter}; use stats::{Collector, Reporter};
/// Main!
#[tokio::main(worker_threads = 4)] #[tokio::main(worker_threads = 4)]
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
info!("Welcome to PgCat! Meow."); info!("Welcome to PgCat! Meow.");
// Prepare regexes
if !query_router::QueryRouter::setup() { if !query_router::QueryRouter::setup() {
error!("Could not setup query router"); error!("Could not setup query router");
return; return;
@@ -84,7 +81,6 @@ async fn main() {
String::from("pgcat.toml") String::from("pgcat.toml")
}; };
// Prepare the config
match config::parse(&config_file).await { match config::parse(&config_file).await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
@@ -94,8 +90,8 @@ async fn main() {
}; };
let config = get_config(); let config = get_config();
let addr = format!("{}:{}", config.general.host, config.general.port); let addr = format!("{}:{}", config.general.host, config.general.port);
let listener = match TcpListener::bind(&addr).await { let listener = match TcpListener::bind(&addr).await {
Ok(sock) => sock, Ok(sock) => sock,
Err(err) => { Err(err) => {
@@ -105,18 +101,20 @@ async fn main() {
}; };
info!("Running on {}", addr); info!("Running on {}", addr);
config.show(); config.show();
// Tracks which client is connected to which server for query cancellation. // Tracks which client is connected to which server for query cancellation.
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Collect statistics and send them to StatsD // Statistics reporting.
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
// Connection pool for all shards and replicas // Connection pool that allows to query all shards and replicas.
let mut pool = let mut pool =
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await; ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
// Statistics collector task.
let collector_tx = tx.clone(); let collector_tx = tx.clone();
let addresses = pool.databases(); let addresses = pool.databases();
tokio::task::spawn(async move { tokio::task::spawn(async move {
@@ -135,7 +133,7 @@ async fn main() {
info!("Waiting for clients"); info!("Waiting for clients");
// Main app runs here. // Client connection loop.
tokio::task::spawn(async move { tokio::task::spawn(async move {
loop { loop {
let pool = pool.clone(); let pool = pool.clone();
@@ -151,7 +149,7 @@ async fn main() {
} }
}; };
// Client goes to another thread, bye. // Handle client.
tokio::task::spawn(async move { tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc(); let start = chrono::offset::Utc::now().naive_utc();
match client::Client::startup(socket, client_server_map, server_info, reporter) match client::Client::startup(socket, client_server_map, server_info, reporter)
@@ -185,7 +183,7 @@ async fn main() {
} }
}); });
// Reload config // Reload config:
// kill -SIGHUP $(pgrep pgcat) // kill -SIGHUP $(pgrep pgcat)
tokio::task::spawn(async move { tokio::task::spawn(async move {
let mut stream = unix_signal(SignalKind::hangup()).unwrap(); let mut stream = unix_signal(SignalKind::hangup()).unwrap();
@@ -205,16 +203,15 @@ async fn main() {
} }
}); });
// Setup shut down sequence // Exit on Ctrl-C (SIGINT) and SIGTERM.
match signal::ctrl_c().await { let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
Ok(()) => {
info!("Shutting down...");
}
Err(err) => { tokio::select! {
error!("Unable to listen for shutdown signal: {}", err); _ = signal::ctrl_c() => (),
} _ = term_signal.recv() => (),
}; };
info!("Shutting down...");
} }
/// Format chrono::Duration to be more human-friendly. /// Format chrono::Duration to be more human-friendly.

View File

@@ -222,7 +222,7 @@ pub async fn custom_protocol_response_ok(
/// Send a custom error message to the client. /// Send a custom error message to the client.
/// Tell the client we are ready for the next query and no rollback is necessary. /// Tell the client we are ready for the next query and no rollback is necessary.
/// Docs on error codes: https://www.postgresql.org/docs/12/errcodes-appendix.html /// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> { pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> {
let mut error = BytesMut::new(); let mut error = BytesMut::new();
@@ -339,6 +339,7 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
res res
} }
/// Create a DataRow message.
pub fn data_row(row: &Vec<String>) -> BytesMut { pub fn data_row(row: &Vec<String>) -> BytesMut {
let mut res = BytesMut::new(); let mut res = BytesMut::new();
let mut data_row = BytesMut::new(); let mut data_row = BytesMut::new();
@@ -358,6 +359,7 @@ pub fn data_row(row: &Vec<String>) -> BytesMut {
res res
} }
/// Create a CommandComplete message.
pub fn command_complete(command: &str) -> BytesMut { pub fn command_complete(command: &str) -> BytesMut {
let cmd = BytesMut::from(format!("{}\0", command).as_bytes()); let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
let mut res = BytesMut::new(); let mut res = BytesMut::new();

View File

@@ -1,24 +1,23 @@
/// Pooling and failover and banlist. /// Pooling, failover and banlist.
use async_trait::async_trait; use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection}; use bb8::{ManageConnection, Pool, PooledConnection};
use bytes::BytesMut; use bytes::BytesMut;
use chrono::naive::NaiveDateTime; use chrono::naive::NaiveDateTime;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use crate::config::{get_config, Address, Role, User}; use crate::config::{get_config, Address, Role, User};
use crate::errors::Error; use crate::errors::Error;
use crate::server::Server; use crate::server::Server;
use crate::stats::Reporter; use crate::stats::Reporter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
// Banlist: bad servers go in here.
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>; pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>; pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
/// The globally accessible connection pool.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ConnectionPool { pub struct ConnectionPool {
databases: Vec<Vec<Pool<ServerPool>>>, databases: Vec<Vec<Pool<ServerPool>>>,
@@ -29,7 +28,7 @@ pub struct ConnectionPool {
} }
impl ConnectionPool { impl ConnectionPool {
/// Construct the connection pool from a config file. /// Construct the connection pool from the configuration.
pub async fn from_config( pub async fn from_config(
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
stats: Reporter, stats: Reporter,
@@ -204,15 +203,12 @@ impl ConnectionPool {
} }
while allowed_attempts > 0 { while allowed_attempts > 0 {
// Round-robin each client's queries. // Round-robin replicas.
// If a client only sends one query and then disconnects, it doesn't matter
// which replica it'll go to.
self.round_robin += 1; self.round_robin += 1;
let index = self.round_robin % addresses.len(); let index = self.round_robin % addresses.len();
let address = &addresses[index]; let address = &addresses[index];
self.stats.client_waiting(process_id, address.id);
// Make sure you're getting a primary or a replica // Make sure you're getting a primary or a replica
// as per request. If no specific role is requested, the first // as per request. If no specific role is requested, the first
// available will be chosen. // available will be chosen.
@@ -226,6 +222,9 @@ impl ConnectionPool {
continue; continue;
} }
// Indicate we're waiting on a server connection from a pool.
self.stats.client_waiting(process_id, address.id);
// Check if we can connect // Check if we can connect
let mut conn = match self.databases[shard][index].get().await { let mut conn = match self.databases[shard][index].get().await {
Ok(conn) => conn, Ok(conn) => conn,
@@ -239,7 +238,7 @@ impl ConnectionPool {
} }
}; };
// // Check if this server is alive with a health check // // Check if this server is alive with a health check.
let server = &mut *conn; let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout; let healthcheck_timeout = get_config().general.healthcheck_timeout;
@@ -251,7 +250,7 @@ impl ConnectionPool {
) )
.await .await
{ {
// Check if health check succeeded // Check if health check succeeded.
Ok(res) => match res { Ok(res) => match res {
Ok(_) => { Ok(_) => {
self.stats self.stats
@@ -259,8 +258,11 @@ impl ConnectionPool {
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
// Health check failed.
Err(_) => { Err(_) => {
error!("Banning replica {} because of failed health check", index); error!("Banning replica {} because of failed health check", index);
// Don't leave a bad connection in the pool. // Don't leave a bad connection in the pool.
server.mark_bad(); server.mark_bad();
@@ -271,7 +273,8 @@ impl ConnectionPool {
continue; continue;
} }
}, },
// Health check never came back, database is really really down
// Health check timed out.
Err(_) => { Err(_) => {
error!("Banning replica {} because of health check timeout", index); error!("Banning replica {} because of health check timeout", index);
// Don't leave a bad connection in the pool. // Don't leave a bad connection in the pool.
@@ -358,14 +361,18 @@ impl ConnectionPool {
} }
} }
/// Get the number of configured shards.
pub fn shards(&self) -> usize { pub fn shards(&self) -> usize {
self.databases.len() self.databases.len()
} }
/// Get the number of servers (primary and replicas)
/// configured for a shard.
pub fn servers(&self, shard: usize) -> usize { pub fn servers(&self, shard: usize) -> usize {
self.addresses[shard].len() self.addresses[shard].len()
} }
/// Get the total number of servers (databases) we are connected to.
pub fn databases(&self) -> usize { pub fn databases(&self) -> usize {
let mut databases = 0; let mut databases = 0;
for shard in 0..self.shards() { for shard in 0..self.shards() {
@@ -374,15 +381,18 @@ impl ConnectionPool {
databases databases
} }
/// Get pool state for a particular shard server as reported by bb8.
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State { pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
self.databases[shard][server].state() self.databases[shard][server].state()
} }
/// Get the address information for a shard server.
pub fn address(&self, shard: usize, server: usize) -> &Address { pub fn address(&self, shard: usize, server: usize) -> &Address {
&self.addresses[shard][server] &self.addresses[shard][server]
} }
} }
/// Wrapper for the bb8 connection pool.
pub struct ServerPool { pub struct ServerPool {
address: Address, address: Address,
user: User, user: User,
@@ -427,6 +437,7 @@ impl ManageConnection for ServerPool {
let process_id = rand::random::<i32>(); let process_id = rand::random::<i32>();
self.stats.server_login(process_id, self.address.id); self.stats.server_login(process_id, self.address.id);
// Connect to the PostgreSQL server.
match Server::startup( match Server::startup(
&self.address, &self.address,
&self.user, &self.user,

View File

@@ -1,5 +1,3 @@
use crate::config::{get_config, Role};
use crate::sharding::{Sharder, ShardingFunction};
/// Route queries automatically based on explicitely requested /// Route queries automatically based on explicitely requested
/// or implied query characteristics. /// or implied query characteristics.
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
@@ -10,6 +8,10 @@ use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::dialect::PostgreSqlDialect; use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
use crate::config::{get_config, Role};
use crate::sharding::{Sharder, ShardingFunction};
/// Regexes used to parse custom commands.
const CUSTOM_SQL_REGEXES: [&str; 5] = [ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$", r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
@@ -18,6 +20,7 @@ const CUSTOM_SQL_REGEXES: [&str; 5] = [
r"(?i)^ *SHOW SERVER ROLE *;? *$", r"(?i)^ *SHOW SERVER ROLE *;? *$",
]; ];
/// Custom commands.
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub enum Command { pub enum Command {
SetShardingKey, SetShardingKey,
@@ -27,37 +30,39 @@ pub enum Command {
ShowServerRole, ShowServerRole,
} }
// Quick test /// Quickly test for match when a query is received.
static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new(); static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Capture value // Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new(); static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
/// The query router.
pub struct QueryRouter { pub struct QueryRouter {
// By default, queries go here, unless we have better information /// By default, queries go here, unless we have better information
// about what the client wants. /// about what the client wants.
default_server_role: Option<Role>, default_server_role: Option<Role>,
// Number of shards in the cluster. /// Number of shards in the cluster.
shards: usize, shards: usize,
// Which shard we should be talking to right now. /// Which shard we should be talking to right now.
active_shard: Option<usize>, active_shard: Option<usize>,
// Should we be talking to a primary or a replica? /// Which server should we be talking to.
active_role: Option<Role>, active_role: Option<Role>,
// Include the primary into the replica pool? /// Include the primary into the replica pool for reads.
primary_reads_enabled: bool, primary_reads_enabled: bool,
// Should we try to parse queries? /// Should we try to parse queries to route them to replicas or primary automatically.
query_parser_enabled: bool, query_parser_enabled: bool,
// Which sharding function are we using? /// Which sharding function we're using.
sharding_function: ShardingFunction, sharding_function: ShardingFunction,
} }
impl QueryRouter { impl QueryRouter {
/// One-time initialization of regexes.
pub fn setup() -> bool { pub fn setup() -> bool {
let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) { let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
Ok(rgx) => rgx, Ok(rgx) => rgx,
@@ -88,6 +93,7 @@ impl QueryRouter {
} }
} }
/// Create a new instance of the query router. Each client gets its own.
pub fn new() -> QueryRouter { pub fn new() -> QueryRouter {
let config = get_config(); let config = get_config();
@@ -120,6 +126,7 @@ impl QueryRouter {
pub fn try_execute_command(&mut self, mut buf: BytesMut) -> Option<(Command, String)> { pub fn try_execute_command(&mut self, mut buf: BytesMut) -> Option<(Command, String)> {
let code = buf.get_u8() as char; let code = buf.get_u8() as char;
// Only simple protocol supported for commands.
if code != 'Q' { if code != 'Q' {
return None; return None;
} }
@@ -158,8 +165,7 @@ impl QueryRouter {
// figured out a better way just yet. I think I can write a single Regex // figured out a better way just yet. I think I can write a single Regex
// that matches all 5 custom SQL patterns, but maybe that's not very legible? // that matches all 5 custom SQL patterns, but maybe that's not very legible?
// //
// I think this is faster than running the Regex engine 5 times, so // I think this is faster than running the Regex engine 5 times.
// this is a strong maybe for me so far.
match regex_list[matches[0]].captures(&query) { match regex_list[matches[0]].captures(&query) {
Some(captures) => match captures.get(1) { Some(captures) => match captures.get(1) {
Some(value) => value.as_str().to_string(), Some(value) => value.as_str().to_string(),
@@ -221,7 +227,6 @@ impl QueryRouter {
} }
"default" => { "default" => {
// TODO: reset query parser to default here.
self.active_role = self.default_server_role; self.active_role = self.default_server_role;
self.query_parser_enabled = get_config().query_router.query_parser_enabled; self.query_parser_enabled = get_config().query_router.query_parser_enabled;
self.active_role self.active_role
@@ -243,12 +248,14 @@ impl QueryRouter {
let len = buf.get_i32() as usize; let len = buf.get_i32() as usize;
let query = match code { let query = match code {
// Query
'Q' => { 'Q' => {
let query = String::from_utf8_lossy(&buf[..len - 5]).to_string(); let query = String::from_utf8_lossy(&buf[..len - 5]).to_string();
debug!("Query: '{}'", query); debug!("Query: '{}'", query);
query query
} }
// Parse (prepared statement)
'P' => { 'P' => {
let mut start = 0; let mut start = 0;
let mut end; let mut end;
@@ -271,6 +278,7 @@ impl QueryRouter {
query.replace("$", "") // Remove placeholders turning them into "values" query.replace("$", "") // Remove placeholders turning them into "values"
} }
_ => return false, _ => return false,
}; };
@@ -334,6 +342,7 @@ impl QueryRouter {
self.query_parser_enabled self.query_parser_enabled
} }
/// Allows to toggle primary reads in tests.
#[allow(dead_code)] #[allow(dead_code)]
pub fn toggle_primary_reads(&mut self, value: bool) { pub fn toggle_primary_reads(&mut self, value: bool) {
self.primary_reads_enabled = value; self.primary_reads_enabled = value;

311
src/scram.rs Normal file
View File

@@ -0,0 +1,311 @@
// SCRAM authentication...largely copy/pasted from
// https://github.com/sfackler/rust-postgres/.
use bytes::BytesMut;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use std::fmt::Write;
use crate::constants::*;
use crate::errors::Error;
fn normalize(pass: &[u8]) -> Vec<u8> {
let pass = match std::str::from_utf8(pass) {
Ok(pass) => pass,
Err(_) => return pass.to_vec(),
};
match stringprep::saslprep(pass) {
Ok(pass) => pass.into_owned().into_bytes(),
Err(_) => pass.as_bytes().to_vec(),
}
}
pub struct ScramSha256 {
password: String,
salted_password: [u8; 32],
auth_message: String,
message: BytesMut,
nonce: String,
}
impl ScramSha256 {
pub fn new(password: &str) -> ScramSha256 {
let mut rng = rand::thread_rng();
let nonce = (0..NONCE_LENGTH)
.map(|_| {
let mut v = rng.gen_range(0x21u8..0x7e);
if v == 0x2c {
v = 0x7e
}
v as char
})
.collect::<String>();
Self::from_nonce(password, &nonce)
}
pub fn from_nonce(password: &str, nonce: &str) -> ScramSha256 {
let message = BytesMut::from(&format!("{}n=,r={}", "n,,", nonce).as_bytes()[..]);
ScramSha256 {
password: password.to_string(),
nonce: String::from(nonce),
message,
salted_password: [0u8; 32],
auth_message: String::new(),
}
}
pub fn message(&mut self) -> BytesMut {
self.message.clone()
}
pub fn update(&mut self, message: &BytesMut) -> Result<BytesMut, Error> {
let server_message = Message::parse(message)?;
if !server_message.nonce.starts_with(&self.nonce) {
// trace!("Bad server nonce");
return Err(Error::ProtocolSyncError);
}
let salt = match base64::decode(&server_message.salt) {
Ok(salt) => salt,
Err(_) => return Err(Error::ProtocolSyncError),
};
let salted_password = Self::hi(
&normalize(&self.password.as_bytes()[..]),
&salt,
server_message.iterations,
);
self.salted_password = salted_password;
let mut hmac = Hmac::<Sha256>::new_from_slice(&salted_password)
.expect("HMAC is able to accept all key sizes");
hmac.update(b"Client Key");
let client_key = hmac.finalize().into_bytes();
let mut hash = Sha256::default();
hash.update(client_key.as_slice());
let stored_key = hash.finalize_fixed();
let mut cbind_input = vec![];
cbind_input.extend("n,,".as_bytes());
let cbind_input = base64::encode(&cbind_input);
self.message.clear();
write!(
&mut self.message,
"c={},r={}",
cbind_input, server_message.nonce
)
.unwrap();
let auth_message = format!(
"n=,r={},{},{}",
self.nonce,
String::from_utf8_lossy(&message[..]),
String::from_utf8_lossy(&self.message[..])
);
let mut hmac = Hmac::<Sha256>::new_from_slice(&stored_key)
.expect("HMAC is able to accept all key sizes");
hmac.update(auth_message.as_bytes());
let client_signature = hmac.finalize().into_bytes();
let mut client_proof = client_key;
for (proof, signature) in client_proof.iter_mut().zip(client_signature) {
*proof ^= signature;
}
write!(&mut self.message, ",p={}", base64::encode(&*client_proof)).unwrap();
self.auth_message = auth_message;
Ok(self.message.clone())
}
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
let final_message = FinalMessage::parse(message)?;
let verifier = match base64::decode(&final_message.value) {
Ok(verifier) => verifier,
Err(_) => return Err(Error::ProtocolSyncError),
};
let mut hmac = Hmac::<Sha256>::new_from_slice(&self.salted_password)
.expect("HMAC is able to accept all key sizes");
hmac.update(b"Server Key");
let server_key = hmac.finalize().into_bytes();
let mut hmac = Hmac::<Sha256>::new_from_slice(&server_key)
.expect("HMAC is able to accept all key sizes");
hmac.update(self.auth_message.as_bytes());
match hmac.verify_slice(&verifier) {
Ok(_) => Ok(()),
Err(_) => return Err(Error::ServerError),
}
}
// https://github.com/sfackler/rust-postgres/blob/c3a029e60c1c0bd0be947049859b8fa5bd5ac220/postgres-protocol/src/authentication/sasl.rs#L35
fn hi(str: &[u8], salt: &[u8], i: u32) -> [u8; 32] {
let mut hmac =
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
hmac.update(salt);
hmac.update(&[0, 0, 0, 1]);
let mut prev = hmac.finalize().into_bytes();
let mut hi = prev;
for _ in 1..i {
let mut hmac = Hmac::<Sha256>::new_from_slice(str).expect("already checked above");
hmac.update(&prev);
prev = hmac.finalize().into_bytes();
for (hi, prev) in hi.iter_mut().zip(prev) {
*hi ^= prev;
}
}
hi.into()
}
}
#[derive(Default, Debug)]
struct Message {
nonce: String,
salt: String,
iterations: u32,
}
impl Message {
fn parse(message: &BytesMut) -> Result<Message, Error> {
if !message.starts_with(b"r=") {
return Err(Error::ProtocolSyncError);
}
let mut i = 2;
while message[i] != b',' && i < message.len() {
i += 1;
}
let nonce = String::from_utf8_lossy(&message[2..i]).to_string();
// Skip the ,
i += 1;
if !&message[i..].starts_with(b"s=") {
return Err(Error::ProtocolSyncError);
}
// Skip the s=
i += 2;
let s = i;
while message[i] != b',' && i < message.len() {
i += 1;
}
let salt = String::from_utf8_lossy(&message[s..i]).to_string();
// Skip the ,
i += 1;
if !&message[i..].starts_with(b"i=") {
return Err(Error::ProtocolSyncError);
}
i += 2;
let iterations = match String::from_utf8_lossy(&message[i..]).parse::<u32>() {
Ok(it) => it,
Err(_) => return Err(Error::ProtocolSyncError),
};
Ok(Message {
nonce,
salt,
iterations,
})
}
}
struct FinalMessage {
value: String,
}
impl FinalMessage {
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
if !message.starts_with(b"v=") {
return Err(Error::ProtocolSyncError);
}
Ok(FinalMessage {
value: String::from_utf8_lossy(&message[2..]).to_string(),
})
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse_server_first_message() {
let message = BytesMut::from(
&"r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096".as_bytes()[..],
);
let message = Message::parse(&message).unwrap();
assert_eq!(message.nonce, "fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j");
assert_eq!(message.salt, "QSXCR+Q6sek8bf92");
assert_eq!(message.iterations, 4096);
}
#[test]
fn parse_server_last_message() {
let f = FinalMessage::parse(&BytesMut::from(
&"v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".as_bytes()[..],
))
.unwrap();
assert_eq!(
f.value,
"U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".to_string()
);
}
// recorded auth exchange from psql
#[test]
fn exchange() {
let password = "foobar";
let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB";
let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB";
let server_first =
"r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\
=4096";
let client_final =
"c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\
1NTlQYNs5BTeQjdHdk7lOflDo5re2an8=";
let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw=";
let mut scram = ScramSha256::from_nonce(password, nonce);
let message = scram.message();
assert_eq!(std::str::from_utf8(&message).unwrap(), client_first);
let result = scram
.update(&BytesMut::from(&server_first.as_bytes()[..]))
.unwrap();
assert_eq!(std::str::from_utf8(&result).unwrap(), client_final);
scram
.finish(&BytesMut::from(&server_final.as_bytes()[..]))
.unwrap();
}
}

View File

@@ -1,6 +1,6 @@
/// Implementation of the PostgreSQL server (database) protocol.
/// Here we are pretending to the a Postgres client.
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client.
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use tokio::io::{AsyncReadExt, BufReader}; use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{ use tokio::net::{
@@ -12,48 +12,52 @@ use crate::config::{Address, User};
use crate::constants::*; use crate::constants::*;
use crate::errors::Error; use crate::errors::Error;
use crate::messages::*; use crate::messages::*;
use crate::scram::ScramSha256;
use crate::stats::Reporter; use crate::stats::Reporter;
use crate::ClientServerMap; use crate::ClientServerMap;
/// Server state. /// Server state.
pub struct Server { pub struct Server {
// Server host, e.g. localhost, /// Server host, e.g. localhost,
// port, e.g. 5432, and role, e.g. primary or replica. /// port, e.g. 5432, and role, e.g. primary or replica.
address: Address, address: Address,
// Buffered read socket. /// Buffered read socket.
read: BufReader<OwnedReadHalf>, read: BufReader<OwnedReadHalf>,
// Unbuffered write socket (our client code buffers). /// Unbuffered write socket (our client code buffers).
write: OwnedWriteHalf, write: OwnedWriteHalf,
// Our server response buffer. We buffer data before we give it to the client. /// Our server response buffer. We buffer data before we give it to the client.
buffer: BytesMut, buffer: BytesMut,
// Server information the server sent us over on startup. /// Server information the server sent us over on startup.
server_info: BytesMut, server_info: BytesMut,
// Backend id and secret key used for query cancellation. /// Backend id and secret key used for query cancellation.
process_id: i32, process_id: i32,
secret_key: i32, secret_key: i32,
// Is the server inside a transaction or idle. /// Is the server inside a transaction or idle.
in_transaction: bool, in_transaction: bool,
// Is there more data for the client to read. /// Is there more data for the client to read.
data_available: bool, data_available: bool,
// Is the server broken? We'll remote it from the pool if so. /// Is the server broken? We'll remote it from the pool if so.
bad: bool, bad: bool,
// Mapping of clients and servers used for query cancellation. /// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap, client_server_map: ClientServerMap,
// Server connected at. /// Server connected at.
connected_at: chrono::naive::NaiveDateTime, connected_at: chrono::naive::NaiveDateTime,
// Reports various metrics, e.g. data sent & received. /// Reports various metrics, e.g. data sent & received.
stats: Reporter, stats: Reporter,
/// Application name using the server at the moment.
application_name: String,
} }
impl Server { impl Server {
@@ -77,7 +81,7 @@ impl Server {
trace!("Sending StartupMessage"); trace!("Sending StartupMessage");
// Send the startup packet telling the server we're a normal Postgres client. // StartupMessage
startup(&mut stream, &user.name, database).await?; startup(&mut stream, &user.name, database).await?;
let mut server_info = BytesMut::new(); let mut server_info = BytesMut::new();
@@ -86,6 +90,8 @@ impl Server {
// We'll be handling multiple packets, but they will all be structured the same. // We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete. // We'll loop here until this exchange is complete.
let mut scram = ScramSha256::new(&user.password);
loop { loop {
let code = match stream.read_u8().await { let code = match stream.read_u8().await {
Ok(code) => code as char, Ok(code) => code as char,
@@ -127,6 +133,83 @@ impl Server {
AUTHENTICATION_SUCCESSFUL => (), AUTHENTICATION_SUCCESSFUL => (),
SASL => {
debug!("Starting SASL authentication");
let sasl_len = (len - 8) as usize;
let mut sasl_auth = vec![0u8; sasl_len];
match stream.read_exact(&mut sasl_auth).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
if sasl_type == SCRAM_SHA_256 {
debug!("Using {}", SCRAM_SHA_256);
// Send client message
let sasl_response = scram.message();
let mut res = BytesMut::new();
res.put_u8(b'p');
res.put_i32(
4 + SCRAM_SHA_256.len() as i32
+ 1
+ sasl_response.len() as i32
+ 4,
);
res.put_slice(&format!("{}\0", SCRAM_SHA_256).as_bytes()[..]);
res.put_i32(sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
} else {
error!("Unsupported SCRAM version: {}", sasl_type);
return Err(Error::ServerError);
}
}
SASL_CONTINUE => {
trace!("Continuing SASL");
let mut sasl_data = vec![0u8; (len - 8) as usize];
match stream.read_exact(&mut sasl_data).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
let msg = BytesMut::from(&sasl_data[..]);
let sasl_response = scram.update(&msg)?;
let mut res = BytesMut::new();
res.put_u8(b'p');
res.put_i32(4 + sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
}
SASL_FINAL => {
trace!("Final SASL");
let mut sasl_final = vec![0u8; len as usize - 8];
match stream.read_exact(&mut sasl_final).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
match scram.finish(&BytesMut::from(&sasl_final[..])) {
Ok(_) => {
debug!("SASL authentication successful");
}
Err(err) => {
debug!("SASL authentication failed");
return Err(err);
}
};
}
_ => { _ => {
error!("Unsupported authentication mechanism: {}", auth_code); error!("Unsupported authentication mechanism: {}", auth_code);
return Err(Error::ServerError); return Err(Error::ServerError);
@@ -187,7 +270,7 @@ impl Server {
// BackendKeyData // BackendKeyData
'K' => { 'K' => {
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later. // The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html // See: <https://www.postgresql.org/docs/12/protocol-message-formats.html>.
process_id = match stream.read_i32().await { process_id = match stream.read_i32().await {
Ok(id) => id, Ok(id) => id,
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
@@ -208,11 +291,9 @@ impl Server {
Err(_) => return Err(Error::SocketError), Err(_) => return Err(Error::SocketError),
}; };
// This is the last step in the client-server connection setup,
// and indicates the server is ready for to query it.
let (read, write) = stream.into_split(); let (read, write) = stream.into_split();
return Ok(Server { let mut server = Server {
address: address.clone(), address: address.clone(),
read: BufReader::new(read), read: BufReader::new(read),
write: write, write: write,
@@ -226,7 +307,12 @@ impl Server {
client_server_map: client_server_map, client_server_map: client_server_map,
connected_at: chrono::offset::Utc::now().naive_utc(), connected_at: chrono::offset::Utc::now().naive_utc(),
stats: stats, stats: stats,
}); application_name: String::new(),
};
server.set_name("pgcat").await?;
return Ok(server);
} }
// We have an unexpected message from the server during this exchange. // We have an unexpected message from the server during this exchange.
@@ -342,8 +428,7 @@ impl Server {
// More data is available after this message, this is not the end of the reply. // More data is available after this message, this is not the end of the reply.
self.data_available = true; self.data_available = true;
// Don't flush yet, the more we buffer, the faster this goes... // Don't flush yet, the more we buffer, the faster this goes...up to a limit.
// up to a limit of course.
if self.buffer.len() >= 8196 { if self.buffer.len() >= 8196 {
break; break;
} }
@@ -411,7 +496,7 @@ impl Server {
/// Indicate that this server connection cannot be re-used and must be discarded. /// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self) { pub fn mark_bad(&mut self) {
error!("Server marked bad"); error!("Server {:?} marked bad", self.address);
self.bad = true; self.bad = true;
} }
@@ -451,9 +536,14 @@ impl Server {
/// A shorthand for `SET application_name = $1`. /// A shorthand for `SET application_name = $1`.
#[allow(dead_code)] #[allow(dead_code)]
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
Ok(self if self.application_name != name {
.query(&format!("SET application_name = '{}'", name)) self.application_name = name.to_string();
.await?) Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?)
} else {
Ok(())
}
} }
/// Get the servers address. /// Get the servers address.
@@ -462,6 +552,7 @@ impl Server {
self.address.clone() self.address.clone()
} }
/// Get the server's unique identifier.
pub fn process_id(&self) -> i32 { pub fn process_id(&self) -> i32 {
self.process_id self.process_id
} }
@@ -481,9 +572,10 @@ impl Drop for Server {
match self.write.try_write(&bytes) { match self.write.try_write(&bytes) {
Ok(_) => (), Ok(_) => (),
Err(_) => (), Err(_) => debug!("Dirty shutdown"),
}; };
// Should not matter.
self.bad = true; self.bad = true;
let now = chrono::offset::Utc::now().naive_utc(); let now = chrono::offset::Utc::now().naive_utc();

View File

@@ -1,20 +1,27 @@
/// Implements various sharding functions.
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
// https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20 /// See: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20>.
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD; const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
/// The sharding functions we support.
#[derive(Debug, PartialEq, Copy, Clone)] #[derive(Debug, PartialEq, Copy, Clone)]
pub enum ShardingFunction { pub enum ShardingFunction {
PgBigintHash, PgBigintHash,
Sha1, Sha1,
} }
/// The sharder.
pub struct Sharder { pub struct Sharder {
/// Number of shards in the cluster.
shards: usize, shards: usize,
/// The sharding function in use.
sharding_function: ShardingFunction, sharding_function: ShardingFunction,
} }
impl Sharder { impl Sharder {
/// Create new instance of the sharder.
pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder { pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
Sharder { Sharder {
shards, shards,
@@ -22,6 +29,7 @@ impl Sharder {
} }
} }
/// Compute the shard given sharding key.
pub fn shard(&self, key: i64) -> usize { pub fn shard(&self, key: i64) -> usize {
match self.sharding_function { match self.sharding_function {
ShardingFunction::PgBigintHash => self.pg_bigint_hash(key), ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
@@ -31,7 +39,7 @@ impl Sharder {
/// Hash function used by Postgres to determine which partition /// Hash function used by Postgres to determine which partition
/// to put the row in when using HASH(column) partitioning. /// to put the row in when using HASH(column) partitioning.
/// Source: https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631 /// Source: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631>.
/// Supports only 1 bigint at the moment, but we can add more later. /// Supports only 1 bigint at the moment, but we can add more later.
fn pg_bigint_hash(&self, key: i64) -> usize { fn pg_bigint_hash(&self, key: i64) -> usize {
let mut lohalf = key as u32; let mut lohalf = key as u32;
@@ -119,6 +127,7 @@ impl Sharder {
a a
} }
#[inline]
fn pg_u32_hash(k: u32) -> u64 { fn pg_u32_hash(k: u32) -> u64 {
let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::<u32>() as u32 + 3923095 as u32; let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::<u32>() as u32 + 3923095 as u32;
let mut b = a; let mut b = a;

View File

@@ -5,12 +5,12 @@ use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
// Latest stats updated every second; used in SHOW STATS and other admin commands. /// Latest stats updated every second; used in SHOW STATS and other admin commands.
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> = static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
Lazy::new(|| Mutex::new(HashMap::new())); Lazy::new(|| Mutex::new(HashMap::new()));
// Statistics period used for average calculations. /// Statistics period used for average calculations.
// 15 seconds. /// 15 seconds.
static STAT_PERIOD: u64 = 15000; static STAT_PERIOD: u64 = 15000;
/// The names for the events reported /// The names for the events reported

View File

@@ -2,3 +2,4 @@ source "https://rubygems.org"
gem "pg" gem "pg"
gem "activerecord" gem "activerecord"
gem "rubocop"

View File

@@ -11,13 +11,33 @@ GEM
i18n (>= 1.6, < 2) i18n (>= 1.6, < 2)
minitest (>= 5.1) minitest (>= 5.1)
tzinfo (~> 2.0) tzinfo (~> 2.0)
ast (2.4.2)
concurrent-ruby (1.1.9) concurrent-ruby (1.1.9)
i18n (1.10.0) i18n (1.10.0)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
minitest (5.15.0) minitest (5.15.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
pg (1.3.2) pg (1.3.2)
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.2.5)
rubocop (1.29.0)
parallel (~> 1.10)
parser (>= 3.1.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.8, < 3.0)
rexml (>= 3.2.5, < 4.0)
rubocop-ast (>= 1.17.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.17.0)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
tzinfo (2.0.4) tzinfo (2.0.4)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)
PLATFORMS PLATFORMS
x86_64-linux x86_64-linux
@@ -25,6 +45,7 @@ PLATFORMS
DEPENDENCIES DEPENDENCIES
activerecord activerecord
pg pg
rubocop
BUNDLED WITH BUNDLED WITH
2.3.7 2.3.7

View File

@@ -1,6 +1,9 @@
# frozen_string_literal: true # frozen_string_literal: true
require 'active_record' require 'active_record'
require 'pg'
$stdout.sync = true
# Uncomment these two to see all queries. # Uncomment these two to see all queries.
# ActiveRecord.verbose_query_logs = true # ActiveRecord.verbose_query_logs = true
@@ -13,6 +16,7 @@ ActiveRecord::Base.establish_connection(
username: 'sharding_user', username: 'sharding_user',
password: 'sharding_user', password: 'sharding_user',
database: 'rails_dev', database: 'rails_dev',
application_name: 'testing_pgcat',
prepared_statements: false, # Transaction mode prepared_statements: false, # Transaction mode
advisory_locks: false # Same advisory_locks: false # Same
) )
@@ -110,3 +114,17 @@ begin
rescue ActiveRecord::StatementInvalid rescue ActiveRecord::StatementInvalid
puts 'OK' puts 'OK'
end end
# Test evil clients
def poorly_behaved_client
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/rails_dev?application_name=testing_pgcat")
conn.async_exec 'BEGIN'
conn.async_exec 'SELECT 1'
conn.close
puts 'Bad client ok'
end
25.times do
poorly_behaved_client
end