Compare commits

..

1 Commits

Author SHA1 Message Date
Lev Kokotov
9be2a17742 pgcat deb package 2023-08-08 11:06:59 -07:00
62 changed files with 1367 additions and 4094 deletions

View File

@@ -9,7 +9,7 @@ jobs:
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
docker:
- image: ghcr.io/postgresml/pgcat-ci:latest
- image: ghcr.io/levkk/pgcat-ci:1.67
environment:
RUST_LOG: info
LLVM_PROFILE_FILE: /tmp/pgcat-%m-%p.profraw
@@ -63,9 +63,6 @@ jobs:
- run:
name: "Lint"
command: "cargo fmt --check"
- run:
name: "Clippy"
command: "cargo clippy --all --all-targets -- -Dwarnings"
- run:
name: "Tests"
command: "cargo clean && cargo build && cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh"

View File

@@ -74,10 +74,6 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
@@ -138,7 +134,6 @@ database = "shard2"
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

View File

@@ -108,24 +108,8 @@ cd ../..
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1
#
# Go tests
# Starts its own pgcat server
#
pushd tests/go
/usr/local/go/bin/go test || exit 1
popd
start_pgcat "info"
#
# Rust tests
#
cd tests/rust
cargo run
cd ../../
# Admin tests
export PGPASSWORD=admin_pass
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
packageVersion:
default: "1.1.2-dev1"
default: "1.1.1"
jobs:
build:
strategy:

1
.gitignore vendored
View File

@@ -10,4 +10,3 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv

View File

@@ -57,38 +57,6 @@ default: 86400000 # 24 hours
Max connection lifetime before it's closed, even if actively used.
### server_round_robin
```
path: general.server_round_robin
default: false
```
Whether to use round robin for server selection or not.
### server_tls
```
path: general.server_tls
default: false
```
Whether to use TLS for server connections or not.
### verify_server_certificate
```
path: general.verify_server_certificate
default: false
```
Whether to verify server certificate or not.
### verify_config
```
path: general.verify_config
default: true
```
Whether to verify config or not.
### idle_client_in_transaction_timeout
```
path: general.idle_client_in_transaction_timeout
@@ -226,39 +194,6 @@ default: "admin_pass"
Password to access the virtual administrative database
### auth_query
```
path: general.auth_query
default: <UNSET>
example: "SELECT $1"
```
Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
established using the database configured in the pool. This parameter is inherited by every pool
and can be redefined in pool configuration.
### auth_query_user
```
path: general.auth_query_user
default: <UNSET>
example: "sharding_user"
```
User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### auth_query_password
```
path: general.auth_query_password
default: <UNSET>
example: "sharding_user"
```
Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### dns_cache_enabled
```
path: general.dns_cache_enabled
@@ -308,15 +243,6 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.
### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
default: 0
```
Size of the prepared statements cache. 0 means disabled.
TODO: update documentation
### query_parser_enabled
```
path: pools.<pool_name>.query_parser_enabled

View File

@@ -2,7 +2,7 @@
Thank you for contributing! Just a few tips here:
1. `cargo fmt` and `cargo clippy` your code before opening up a PR
1. `cargo fmt` your code before opening up a PR
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
3. Performance is important, make sure there are no regressions in your branch vs. `main`.

226
Cargo.lock generated
View File

@@ -17,17 +17,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "1.0.2"
@@ -37,12 +26,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "android-tzdata"
version = "0.1.1"
@@ -60,15 +43,16 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.6.8"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "628a8f9bd1e24b4e0db2b4bc2d000b001e7dd032d54afa60a68836aeec5aa54a"
checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is-terminal",
"utf8parse",
]
@@ -93,17 +77,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [
"windows-sys 0.48.0",
"windows-sys",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
"windows-sys",
]
[[package]]
@@ -202,6 +186,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -258,19 +248,20 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.18"
version = "4.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c"
checksum = "8f644d0dac522c8b05ddc39aaaccc5b136d5dc4ff216610c5641e3be5becf56c"
dependencies = [
"clap_builder",
"clap_derive",
"once_cell",
]
[[package]]
name = "clap_builder"
version = "4.4.18"
version = "4.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7"
checksum = "af410122b9778e024f9e0fb35682cc09cc3f85cad5e8d3ba8f47a9702df6e73d"
dependencies = [
"anstream",
"anstyle",
@@ -280,9 +271,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.4.7"
version = "4.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442"
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
dependencies = [
"heck",
"proc-macro2",
@@ -292,9 +283,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "0.6.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
[[package]]
name = "colorchoice"
@@ -368,6 +359,27 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "exitcode"
version = "1.1.2"
@@ -541,10 +553,6 @@ name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "heck"
@@ -708,7 +716,7 @@ checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f"
dependencies = [
"socket2 0.5.3",
"widestring",
"windows-sys 0.48.0",
"windows-sys",
"winreg",
]
@@ -718,6 +726,17 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "is-terminal"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"rustix",
"windows-sys",
]
[[package]]
name = "itertools"
version = "0.10.5"
@@ -780,6 +799,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0"
[[package]]
name = "lock_api"
version = "0.4.10"
@@ -796,15 +821,6 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lru"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@@ -876,7 +892,7 @@ checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.48.0",
"windows-sys",
]
[[package]]
@@ -885,7 +901,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"bitflags 1.3.2",
"cfg-if",
"libc",
"memoffset",
@@ -963,7 +979,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.1",
"windows-targets",
]
[[package]]
@@ -974,7 +990,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.1.1"
dependencies = [
"arc-swap",
"async-trait",
@@ -992,7 +1008,6 @@ dependencies = [
"itertools",
"jemallocator",
"log",
"lru",
"md-5",
"nix",
"num_cpus",
@@ -1181,7 +1196,7 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags",
"bitflags 1.3.2",
]
[[package]]
@@ -1259,6 +1274,19 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.38.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5"
dependencies = [
"bitflags 2.3.3",
"errno",
"libc",
"linux-raw-sys",
"windows-sys",
]
[[package]]
name = "rustls"
version = "0.21.5"
@@ -1282,9 +1310,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.100.2"
version = "0.100.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b"
dependencies = [
"ring",
"untrusted",
@@ -1440,7 +1468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys",
]
[[package]]
@@ -1593,7 +1621,7 @@ dependencies = [
"signal-hook-registry",
"socket2 0.4.9",
"tokio-macros",
"windows-sys 0.48.0",
"windows-sys",
]
[[package]]
@@ -1974,7 +2002,7 @@ version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-webpki 0.100.2",
"rustls-webpki 0.100.1",
]
[[package]]
@@ -2011,7 +2039,7 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.1",
"windows-targets",
]
[[package]]
@@ -2020,16 +2048,7 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.1",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.0",
"windows-targets",
]
[[package]]
@@ -2038,28 +2057,13 @@ version = "0.48.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f"
dependencies = [
"windows_aarch64_gnullvm 0.48.0",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm 0.48.0",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows-targets"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
dependencies = [
"windows_aarch64_gnullvm 0.52.0",
"windows_aarch64_msvc 0.52.0",
"windows_i686_gnu 0.52.0",
"windows_i686_msvc 0.52.0",
"windows_x86_64_gnu 0.52.0",
"windows_x86_64_gnullvm 0.52.0",
"windows_x86_64_msvc 0.52.0",
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
@@ -2068,84 +2072,42 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_gnu"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_i686_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
[[package]]
name = "winnow"
version = "0.5.0"
@@ -2162,5 +2124,5 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
"windows-sys",
]

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.1.1"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -45,10 +45,9 @@ trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
itertools = "0.10"
clap = { version = "4.4.18", features = ["derive", "env"] }
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,19 +1,9 @@
FROM rust:1-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential
FROM rust:1 AS builder
COPY . /app
WORKDIR /app
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -o Dpkg::Options::=--force-confdef -yq --no-install-recommends \
postgresql-client \
# Clean up layer
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& truncate -s 0 /var/log/*log
FROM debian:bullseye-slim
COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat

View File

@@ -1,6 +1,4 @@
FROM cimg/rust:1.67.1
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \
sudo apt-get install -y \
psmisc postgresql-contrib-14 postgresql-client-14 libpq-dev \
@@ -9,9 +7,6 @@ RUN sudo apt-get update && \
sudo apt-get upgrade curl && \
cargo install cargo-binutils rustfilt && \
rustup component add llvm-tools-preview && \
pip3 install psycopg2 && sudo gem install bundler && \
pip3 install psycopg2 && sudo gem install bundler && \
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -1,25 +0,0 @@
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
RUN apt-get update && \
apt-get install -y build-essential
WORKDIR /app
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Build dependencies - this is the caching Docker layer!
RUN cargo chef cook --release --recipe-path recipe.json
# Build application
COPY . .
RUN cargo build
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
CMD ["pgcat"]

View File

@@ -40,7 +40,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
<img src="./images/postgresml.webp" height="70" width="auto">
</a>
</td>
@@ -57,7 +57,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
PostgresML
</a>
</td>

View File

@@ -1,8 +1,6 @@
FROM rust:1.70-bullseye
# Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN apt-get update -y \
&& apt-get install -y \
llvm-11 psmisc postgresql-contrib postgresql-client \

View File

@@ -71,10 +71,6 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.

View File

@@ -1,16 +0,0 @@
[Unit]
Description=PgCat pooler
After=network.target
StartLimitIntervalSec=0
[Service]
User=pgcat
Type=simple
Restart=always
RestartSec=1
Environment=RUST_LOG=info
LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
[Install]
WantedBy=multi-user.target

View File

@@ -60,6 +60,12 @@ tcp_keepalives_count = 5
# Number of seconds between keepalive packets.
tcp_keepalives_interval = 5
# Handle prepared statements.
prepared_statements = true
# Prepared statements server cache size.
prepared_statements_cache_size = 500
# Path to TLS Certificate file to use for TLS connections
# tls_certificate = ".circleci/server.cert"
# Path to TLS private key file to use for TLS connections
@@ -150,20 +156,12 @@ load_balancing_mode = "random"
# `primary` all queries go to the primary unless otherwise specified.
default_role = "any"
# Prepared statements cache size.
# TODO: update documentation
prepared_statements_cache_size = 500
# If Query Parser is enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
@@ -175,12 +173,6 @@ primary_reads_enabled = true
# shard_id_regex = '/\* shard_id: (\d+) \*/'
# regex_search_limit = 1000 # only look at the first 1000 characters of SQL statements
# Defines the behavior when no shard is selected in a sharded system.
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
# Current options:
@@ -191,7 +183,7 @@ sharding_function = "pg_bigint_hash"
# Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
# established using the database configured in the pool. This parameter is inherited by every pool
# and can be redefined in pool configuration.
# auth_query="SELECT usename, passwd FROM pg_shadow WHERE usename='$1'"
# auth_query = "SELECT $1"
# User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
# specified in `auth_query_user`. The connection will be established using the database configured in the pool.
@@ -301,8 +293,6 @@ username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000
connect_timeout = 1000
idle_timeout = 1000
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
# Each shard config contains a list of servers that make up the shard

View File

@@ -1,9 +0,0 @@
#!/bin/bash
set -e
systemctl daemon-reload
systemctl enable pgcat
if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat
fi

4
postrm
View File

@@ -1,4 +0,0 @@
#!/bin/bash
set -e
systemctl daemon-reload

5
prerm
View File

@@ -1,5 +0,0 @@
#!/bin/bash
set -e
systemctl stop pgcat
systemctl disable pgcat

View File

@@ -1,5 +1,4 @@
use crate::pool::BanReason;
use crate::server::ServerParameters;
use crate::stats::pool::PoolStats;
use bytes::{Buf, BufMut, BytesMut};
use log::{error, info, trace};
@@ -18,16 +17,16 @@ use crate::pool::ClientServerMap;
use crate::pool::{get_all_pools, get_pool};
use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState};
pub fn generate_server_parameters_for_admin() -> ServerParameters {
let mut server_parameters = ServerParameters::new();
pub fn generate_server_info_for_admin() -> BytesMut {
let mut server_info = BytesMut::new();
server_parameters.set_param("application_name".to_string(), "".to_string(), true);
server_parameters.set_param("client_encoding".to_string(), "UTF8".to_string(), true);
server_parameters.set_param("server_encoding".to_string(), "UTF8".to_string(), true);
server_parameters.set_param("server_version".to_string(), VERSION.to_string(), true);
server_parameters.set_param("DateStyle".to_string(), "ISO, MDY".to_string(), true);
server_info.put(server_parameter_message("application_name", ""));
server_info.put(server_parameter_message("client_encoding", "UTF8"));
server_info.put(server_parameter_message("server_encoding", "UTF8"));
server_info.put(server_parameter_message("server_version", VERSION));
server_info.put(server_parameter_message("DateStyle", "ISO, MDY"));
server_parameters
server_info
}
/// Handle admin client.
@@ -74,11 +73,11 @@ where
}
"PAUSE" => {
trace!("PAUSE");
pause(stream, query_parts).await
pause(stream, query_parts[1]).await
}
"RESUME" => {
trace!("RESUME");
resume(stream, query_parts).await
resume(stream, query_parts[1]).await
}
"SHUTDOWN" => {
trace!("SHUTDOWN");
@@ -283,7 +282,7 @@ where
{
let mut res = BytesMut::new();
let detail_msg = [
let detail_msg = vec![
"",
"SHOW HELP|CONFIG|DATABASES|POOLS|CLIENTS|SERVERS|USERS|VERSION",
// "SHOW PEERS|PEER_POOLS", // missing PEERS|PEER_POOLS
@@ -301,6 +300,7 @@ where
// "KILL <db>",
// "SUSPEND",
"SHUTDOWN",
// "WAIT_CLOSE [<db>]", // missing
];
res.put(notify("Console usage", detail_msg.join("\n\t")));
@@ -690,8 +690,6 @@ where
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
("age_seconds", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
let new_map = get_client_stats();
@@ -699,7 +697,6 @@ where
res.put(row_description(&columns));
for (_, client) in new_map {
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
let row = vec![
format!("{:#010X}", client.client_id()),
client.pool_name(),
@@ -713,8 +710,6 @@ where
.duration_since(client.connect_time())
.as_secs()
.to_string(),
(max_wait / 1_000_000).to_string(),
(max_wait % 1_000_000).to_string(),
];
res.put(data_row(&row));
@@ -749,7 +744,6 @@ where
("age_seconds", DataType::Numeric),
("prepare_cache_hit", DataType::Numeric),
("prepare_cache_miss", DataType::Numeric),
("prepare_cache_eviction", DataType::Numeric),
("prepare_cache_size", DataType::Numeric),
];
@@ -782,10 +776,6 @@ where
.prepared_miss_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_eviction_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_cache_size
.load(Ordering::Relaxed)
@@ -806,128 +796,96 @@ where
}
/// Pause a pool. It won't pass any more queries to the backends.
async fn pause<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
async fn pause<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(',').map(|part| part.trim()).collect(),
false => Vec::new(),
};
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
if parts.len() != 2 {
error_response(
stream,
"PAUSE requires a database and a user, e.g. PAUSE my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.pause();
let mut res = BytesMut::new();
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
let mut res = BytesMut::new();
res.put(command_complete("PAUSE"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.pause();
let mut res = BytesMut::new();
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
_ => error_response(stream, "usage: PAUSE [db, user]").await,
}
}
/// Resume a pool. Queries are allowed again.
async fn resume<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
async fn resume<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(',').map(|part| part.trim()).collect(),
false => Vec::new(),
};
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
if parts.len() != 2 {
error_response(
stream,
"RESUME requires a database and a user, e.g. RESUME my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.resume();
let mut res = BytesMut::new();
res.put(command_complete(&format!("RESUME {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
let mut res = BytesMut::new();
res.put(command_complete("RESUME"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.resume();
let mut res = BytesMut::new();
res.put(command_complete(&format!("RESUME {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
_ => error_response(stream, "usage: RESUME [db, user]").await,
}
}

View File

@@ -79,8 +79,6 @@ impl AuthPassthrough {
pool_mode: None,
server_lifetime: None,
min_pool_size: None,
connect_timeout: None,
idle_timeout: None,
};
let user = &address.username;

File diff suppressed because it is too large Load Diff

View File

@@ -25,7 +25,7 @@ pub struct Args {
}
pub fn parse() -> Args {
Args::parse()
return Args::parse();
}
#[derive(ValueEnum, Clone, Debug)]

View File

@@ -3,14 +3,11 @@ use arc_swap::ArcSwap;
use log::{error, info};
use once_cell::sync::Lazy;
use regex::Regex;
use serde::{Deserializer, Serializer};
use serde_derive::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
@@ -104,9 +101,6 @@ pub struct Address {
/// Address stats
pub stats: Arc<AddressStats>,
/// Number of errors encountered since last successful checkout
pub error_count: Arc<AtomicU64>,
}
impl Default for Address {
@@ -116,15 +110,14 @@ impl Default for Address {
host: String::from("127.0.0.1"),
port: 5432,
shard: 0,
address_index: 0,
replica_number: 0,
database: String::from("database"),
role: Role::Replica,
replica_number: 0,
address_index: 0,
username: String::from("username"),
pool_name: String::from("pool_name"),
mirrors: Vec::new(),
stats: Arc::new(AddressStats::default()),
error_count: Arc::new(AtomicU64::new(0)),
}
}
}
@@ -189,18 +182,6 @@ impl Address {
),
}
}
pub fn error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
pub fn increment_error_count(&self) {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
pub fn reset_error_count(&self) {
self.error_count.store(0, Ordering::Relaxed);
}
}
/// PostgreSQL user.
@@ -216,8 +197,6 @@ pub struct User {
pub server_lifetime: Option<u64>,
#[serde(default)] // 0
pub statement_timeout: u64,
pub connect_timeout: Option<u64>,
pub idle_timeout: Option<u64>,
}
impl Default for User {
@@ -232,22 +211,24 @@ impl Default for User {
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
connect_timeout: None,
idle_timeout: None,
}
}
}
impl User {
fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size {
error!(
"min_pool_size of {} cannot be larger than pool_size of {}",
min_pool_size, self.pool_size
);
return Err(Error::BadConfig);
match self.min_pool_size {
Some(min_pool_size) => {
if min_pool_size > self.pool_size {
error!(
"min_pool_size of {} cannot be larger than pool_size of {}",
min_pool_size, self.pool_size
);
return Err(Error::BadConfig);
}
}
None => (),
};
Ok(())
@@ -341,6 +322,12 @@ pub struct General {
pub auth_query: Option<String>,
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
#[serde(default)]
pub prepared_statements: bool,
#[serde(default = "General::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
}
impl General {
@@ -422,6 +409,10 @@ impl General {
pub fn default_server_round_robin() -> bool {
true
}
pub fn default_prepared_statements_cache_size() -> usize {
500
}
}
impl Default for General {
@@ -433,33 +424,35 @@ impl Default for General {
prometheus_exporter_port: 9930,
connect_timeout: General::default_connect_timeout(),
idle_timeout: General::default_idle_timeout(),
shutdown_timeout: Self::default_shutdown_timeout(),
healthcheck_timeout: Self::default_healthcheck_timeout(),
healthcheck_delay: Self::default_healthcheck_delay(),
ban_time: Self::default_ban_time(),
worker_threads: Self::default_worker_threads(),
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
tcp_user_timeout: Self::default_tcp_user_timeout(),
log_client_connections: false,
log_client_disconnections: false,
autoreload: None,
dns_cache_enabled: false,
dns_max_ttl: Self::default_dns_max_ttl(),
shutdown_timeout: Self::default_shutdown_timeout(),
healthcheck_timeout: Self::default_healthcheck_timeout(),
healthcheck_delay: Self::default_healthcheck_delay(),
ban_time: Self::default_ban_time(),
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
server_lifetime: Self::default_server_lifetime(),
server_round_robin: Self::default_server_round_robin(),
worker_threads: Self::default_worker_threads(),
autoreload: None,
tls_certificate: None,
tls_private_key: None,
server_tls: false,
verify_server_certificate: false,
admin_username: String::from("admin"),
admin_password: String::from("admin"),
validate_config: true,
auth_query: None,
auth_query_user: None,
auth_query_password: None,
server_lifetime: Self::default_server_lifetime(),
server_round_robin: Self::default_server_round_robin(),
validate_config: true,
prepared_statements: false,
prepared_statements_cache_size: 500,
}
}
}
@@ -518,11 +511,6 @@ pub struct Pool {
#[serde(default)] // False
pub query_parser_enabled: bool,
pub query_parser_max_length: Option<usize>,
#[serde(default)] // False
pub query_parser_read_write_splitting: bool,
#[serde(default)] // False
pub primary_reads_enabled: bool,
@@ -547,9 +535,6 @@ pub struct Pool {
pub shard_id_regex: Option<String>,
pub regex_search_limit: Option<usize>,
#[serde(default = "Pool::default_default_shard")]
pub default_shard: DefaultShard,
pub auth_query: Option<String>,
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
@@ -557,12 +542,6 @@ pub struct Pool {
#[serde(default = "Pool::default_cleanup_server_connections")]
pub cleanup_server_connections: bool,
#[serde(default)] // False
pub log_client_parameter_status_changes: bool,
#[serde(default = "Pool::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
@@ -588,10 +567,6 @@ impl Pool {
PoolMode::Transaction
}
pub fn default_default_shard() -> DefaultShard {
DefaultShard::default()
}
pub fn default_load_balancing_mode() -> LoadBalancingMode {
LoadBalancingMode::Random
}
@@ -612,10 +587,6 @@ impl Pool {
true
}
pub fn default_prepared_statements_cache_size() -> usize {
0
}
pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
@@ -656,25 +627,13 @@ impl Pool {
}
}
if self.query_parser_read_write_splitting && !self.query_parser_enabled {
error!(
"query_parser_read_write_splitting is only valid when query_parser_enabled is true"
);
return Err(Error::BadConfig);
}
if self.plugins.is_some() && !self.query_parser_enabled {
error!("plugins are only valid when query_parser_enabled is true");
return Err(Error::BadConfig);
}
self.automatic_sharding_key = match &self.automatic_sharding_key {
Some(key) => {
// No quotes in the key so we don't have to compare quoted
// to unquoted idents.
let key = key.replace('\"', "");
let key = key.replace("\"", "");
if key.split('.').count() != 2 {
if key.split(".").count() != 2 {
error!(
"automatic_sharding_key '{}' must be fully qualified, e.g. t.{}`",
key, key
@@ -687,14 +646,7 @@ impl Pool {
None => None,
};
if let DefaultShard::Shard(shard_number) = self.default_shard {
if shard_number >= self.shards.len() {
error!("Invalid shard {:?}", shard_number);
return Err(Error::BadConfig);
}
}
for user in self.users.values() {
for (_, user) in &self.users {
user.validate()?;
}
@@ -707,29 +659,24 @@ impl Default for Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: false,
connect_timeout: None,
idle_timeout: None,
server_lifetime: None,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
connect_timeout: None,
idle_timeout: None,
sharding_key_regex: None,
shard_id_regex: None,
regex_search_limit: Some(1000),
default_shard: Self::default_default_shard(),
auth_query: None,
auth_query_user: None,
auth_query_password: None,
cleanup_server_connections: true,
log_client_parameter_status_changes: false,
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
server_lifetime: None,
plugins: None,
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
cleanup_server_connections: true,
}
}
}
@@ -741,50 +688,6 @@ pub struct ServerConfig {
pub role: Role,
}
// No Shard Specified handling.
#[derive(Debug, PartialEq, Clone, Eq, Hash, Copy)]
pub enum DefaultShard {
Shard(usize),
Random,
RandomHealthy,
}
impl Default for DefaultShard {
fn default() -> Self {
DefaultShard::Shard(0)
}
}
impl serde::Serialize for DefaultShard {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
match self {
DefaultShard::Shard(shard) => {
serializer.serialize_str(&format!("shard_{}", &shard.to_string()))
}
DefaultShard::Random => serializer.serialize_str("random"),
DefaultShard::RandomHealthy => serializer.serialize_str("random_healthy"),
}
}
}
impl<'de> serde::Deserialize<'de> for DefaultShard {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if let Some(s) = s.strip_prefix("shard_") {
let shard = s.parse::<usize>().map_err(serde::de::Error::custom)?;
return Ok(DefaultShard::Shard(shard));
}
match s.as_str() {
"random" => Ok(DefaultShard::Random),
"random_healthy" => Ok(DefaultShard::RandomHealthy),
_ => Err(serde::de::Error::custom(
"invalid value for no_shard_specified_behavior",
)),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
pub struct MirrorServerConfig {
pub host: String,
@@ -841,13 +744,13 @@ impl Shard {
impl Default for Shard {
fn default() -> Shard {
Shard {
database: String::from("postgres"),
mirrors: None,
servers: vec![ServerConfig {
host: String::from("localhost"),
port: 5432,
role: Role::Primary,
}],
mirrors: None,
database: String::from("postgres"),
}
}
}
@@ -860,26 +763,15 @@ pub struct Plugins {
pub prewarmer: Option<Prewarmer>,
}
pub trait Plugin {
fn is_enabled(&self) -> bool;
}
impl std::fmt::Display for Plugins {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn is_enabled<T: Plugin>(arg: Option<&T>) -> bool {
if let Some(arg) = arg {
arg.is_enabled()
} else {
false
}
}
write!(
f,
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
is_enabled(self.intercept.as_ref()),
is_enabled(self.table_access.as_ref()),
is_enabled(self.query_logger.as_ref()),
is_enabled(self.prewarmer.as_ref()),
self.intercept.is_some(),
self.table_access.is_some(),
self.query_logger.is_some(),
self.prewarmer.is_some(),
)
}
}
@@ -890,47 +782,23 @@ pub struct Intercept {
pub queries: BTreeMap<String, Query>,
}
impl Plugin for Intercept {
fn is_enabled(&self) -> bool {
self.enabled
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct TableAccess {
pub enabled: bool,
pub tables: Vec<String>,
}
impl Plugin for TableAccess {
fn is_enabled(&self) -> bool {
self.enabled
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct QueryLogger {
pub enabled: bool,
}
impl Plugin for QueryLogger {
fn is_enabled(&self) -> bool {
self.enabled
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
pub struct Prewarmer {
pub enabled: bool,
pub queries: Vec<String>,
}
impl Plugin for Prewarmer {
fn is_enabled(&self) -> bool {
self.enabled
}
}
impl Intercept {
pub fn substitute(&mut self, db: &str, user: &str) {
for (_, query) in self.queries.iter_mut() {
@@ -948,7 +816,6 @@ pub struct Query {
}
impl Query {
#[allow(clippy::needless_range_loop)]
pub fn substitute(&mut self, db: &str, user: &str) {
for col in self.result.iter_mut() {
for i in 0..col.len() {
@@ -1018,8 +885,8 @@ impl Default for Config {
Config {
path: Self::default_path(),
general: General::default(),
plugins: None,
pools: HashMap::default(),
plugins: None,
}
}
}
@@ -1047,17 +914,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.query_parser_enabled", pool_name),
pool.query_parser_enabled.to_string(),
),
(
format!("pools.{}.query_parser_max_length", pool_name),
match pool.query_parser_max_length {
Some(max_length) => max_length.to_string(),
None => String::from("unlimited"),
},
),
(
format!("pools.{}.query_parser_read_write_splitting", pool_name),
pool.query_parser_read_write_splitting.to_string(),
),
(
format!("pools.{}.default_role", pool_name),
pool.default_role.clone(),
@@ -1073,8 +929,8 @@ impl From<&Config> for std::collections::HashMap<String, String> {
(
format!("pools.{:?}.users", pool_name),
pool.users
.values()
.map(|user| &user.username)
.iter()
.map(|(_username, user)| &user.username)
.cloned()
.collect::<Vec<String>>()
.join(", "),
@@ -1128,7 +984,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
impl Config {
/// Print current configuration.
pub fn show(&self) {
info!("Config path: {}", self.path);
info!("Ban time: {}s", self.general.ban_time);
info!(
"Idle client in transaction timeout: {}ms",
@@ -1160,9 +1015,13 @@ impl Config {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
if let Some(tls_private_key) = self.general.tls_private_key.clone() {
info!("TLS private key: {}", tls_private_key);
info!("TLS support is enabled");
match self.general.tls_private_key.clone() {
Some(tls_private_key) => {
info!("TLS private key: {}", tls_private_key);
info!("TLS support is enabled");
}
None => (),
}
}
@@ -1175,6 +1034,13 @@ impl Config {
"Server TLS certificate verification: {}",
self.general.verify_server_certificate
);
info!("Prepared statements: {}", self.general.prepared_statements);
if self.general.prepared_statements {
info!(
"Prepared statements server cache size: {}",
self.general.prepared_statements_cache_size
);
}
info!(
"Plugins: {}",
match self.plugins {
@@ -1190,8 +1056,8 @@ impl Config {
pool_name,
pool_config
.users
.values()
.map(|user_cfg| user_cfg.pool_size)
.iter()
.map(|(_, user_cfg)| user_cfg.pool_size)
.sum::<u32>()
.to_string()
);
@@ -1230,15 +1096,6 @@ impl Config {
"[pool: {}] Query router: {}",
pool_name, pool_config.query_parser_enabled
);
info!(
"[pool: {}] Query parser max length: {:?}",
pool_name, pool_config.query_parser_max_length
);
info!(
"[pool: {}] Infer role from query: {}",
pool_name, pool_config.query_parser_read_write_splitting
);
info!(
"[pool: {}] Number of shards: {}",
pool_name,
@@ -1261,14 +1118,6 @@ impl Config {
"[pool: {}] Cleanup server connections: {}",
pool_name, pool_config.cleanup_server_connections
);
info!(
"[pool: {}] Log client parameter status changes: {}",
pool_name, pool_config.log_client_parameter_status_changes
);
info!(
"[pool: {}] Prepared statements server cache size: {}",
pool_name, pool_config.prepared_statements_cache_size
);
info!(
"[pool: {}] Plugins: {}",
pool_name,
@@ -1311,24 +1160,6 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}][user: {}] Connection timeout: {}",
pool_name,
user.1.username,
match user.1.connect_timeout {
Some(connect_timeout) => format!("{}ms", connect_timeout),
None => "not set".to_string(),
}
);
info!(
"[pool: {}][user: {}] Idle timeout: {}",
pool_name,
user.1.username,
match user.1.idle_timeout {
Some(idle_timeout) => format!("{}ms", idle_timeout),
None => "not set".to_string(),
}
);
}
}
}
@@ -1383,31 +1214,34 @@ impl Config {
}
// Validate TLS!
if let Some(tls_certificate) = self.general.tls_certificate.clone() {
match load_certs(Path::new(&tls_certificate)) {
Ok(_) => {
// Cert is okay, but what about the private key?
match self.general.tls_private_key.clone() {
Some(tls_private_key) => match load_keys(Path::new(&tls_private_key)) {
Ok(_) => (),
Err(err) => {
error!("tls_private_key is incorrectly configured: {:?}", err);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
match load_certs(Path::new(&tls_certificate)) {
Ok(_) => {
// Cert is okay, but what about the private key?
match self.general.tls_private_key.clone() {
Some(tls_private_key) => match load_keys(Path::new(&tls_private_key)) {
Ok(_) => (),
Err(err) => {
error!("tls_private_key is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
},
None => {
error!("tls_certificate is set, but the tls_private_key is not");
return Err(Error::BadConfig);
}
},
};
}
None => {
error!("tls_certificate is set, but the tls_private_key is not");
return Err(Error::BadConfig);
}
};
}
Err(err) => {
error!("tls_certificate is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
Err(err) => {
error!("tls_certificate is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
}
}
None => (),
};
for pool in self.pools.values_mut() {
@@ -1429,6 +1263,14 @@ pub fn get_idle_client_in_transaction_timeout() -> u64 {
CONFIG.load().general.idle_client_in_transaction_timeout
}
pub fn get_prepared_statements() -> bool {
CONFIG.load().general.prepared_statements
}
pub fn get_prepared_statements_cache_size() -> usize {
CONFIG.load().general.prepared_statements_cache_size
}
/// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();

View File

@@ -12,7 +12,6 @@ pub enum Error {
ProtocolSyncError(String),
BadQuery(String),
ServerError,
ServerMessageParserError(String),
ServerStartupError(String, ServerIdentifier),
ServerAuthError(String, ServerIdentifier),
BadConfig,
@@ -28,8 +27,6 @@ pub enum Error {
UnsupportedStatement,
QueryRouterParserError(String),
QueryRouterError(String),
InvalidShardId(usize),
PreparedStatementError,
}
#[derive(Clone, PartialEq, Debug)]

View File

@@ -8,7 +8,6 @@ pub fn init(args: &Args) {
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
let trace_sub = tracing_subscriber::fmt()
.with_thread_ids(true)
.with_env_filter(filter)
.with_ansi(!args.no_color);

View File

@@ -11,17 +11,11 @@ use crate::client::PREPARED_STATEMENT_COUNTER;
use crate::config::get_config;
use crate::errors::Error;
use crate::constants::MESSAGE_TERMINATOR;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::ffi::CString;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::io::{BufRead, Cursor};
use std::mem;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
/// Postgres data type mappings
@@ -117,11 +111,19 @@ pub fn simple_query(query: &str) -> BytesMut {
}
/// Tell the client we're ready for another query.
pub async fn send_ready_for_query<S>(stream: &mut S) -> Result<(), Error>
pub async fn ready_for_query<S>(stream: &mut S) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
write_all(stream, ready_for_query(false)).await
let mut bytes = BytesMut::with_capacity(
mem::size_of::<u8>() + mem::size_of::<i32>() + mem::size_of::<u8>(),
);
bytes.put_u8(b'Z');
bytes.put_i32(5);
bytes.put_u8(b'I'); // Idle
write_all(stream, bytes).await
}
/// Send the startup packet the server. We're pretending we're a Pg client.
@@ -139,10 +141,6 @@ where
bytes.put_slice(user.as_bytes());
bytes.put_u8(0);
// Application name
bytes.put(&b"application_name\0"[..]);
bytes.put_slice(&b"pgcat\0"[..]);
// Database
bytes.put(&b"database\0"[..]);
bytes.put_slice(database.as_bytes());
@@ -158,10 +156,12 @@ where
match stream.write_all(&startup).await {
Ok(_) => Ok(()),
Err(err) => Err(Error::SocketError(format!(
"Error writing startup to server socket - Error: {:?}",
err
))),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing startup to server socket - Error: {:?}",
err
)))
}
}
}
@@ -237,8 +237,8 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
let mut md5 = Md5::new();
// First pass
md5.update(password.as_bytes());
md5.update(user.as_bytes());
md5.update(&password.as_bytes());
md5.update(&user.as_bytes());
let output = md5.finalize_reset();
@@ -274,7 +274,7 @@ where
{
let password = md5_hash_password(user, password, salt);
let mut message = BytesMut::with_capacity(password.len() + 5);
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
@@ -288,7 +288,7 @@ where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let password = md5_hash_second_pass(hash, salt);
let mut message = BytesMut::with_capacity(password.len() + 5);
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
@@ -315,7 +315,7 @@ where
res.put_slice(&set_complete[..]);
write_all_half(stream, &res).await?;
send_ready_for_query(stream).await
ready_for_query(stream).await
}
/// Send a custom error message to the client.
@@ -326,7 +326,7 @@ where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
error_response_terminal(stream, message).await?;
send_ready_for_query(stream).await
ready_for_query(stream).await
}
/// Send a custom error message to the client.
@@ -427,7 +427,7 @@ where
res.put(command_complete("SELECT 1"));
write_all_half(stream, &res).await?;
send_ready_for_query(stream).await
ready_for_query(stream).await
}
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
@@ -509,7 +509,7 @@ pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
data_row.put_i32(column.len() as i32);
data_row.put_slice(column);
} else {
data_row.put_i32(-1_i32);
data_row.put_i32(-1 as i32);
}
}
@@ -557,37 +557,6 @@ pub fn flush() -> BytesMut {
bytes
}
pub fn sync() -> BytesMut {
let mut bytes = BytesMut::with_capacity(mem::size_of::<u8>() + mem::size_of::<i32>());
bytes.put_u8(b'S');
bytes.put_i32(4);
bytes
}
pub fn parse_complete() -> BytesMut {
let mut bytes = BytesMut::with_capacity(mem::size_of::<u8>() + mem::size_of::<i32>());
bytes.put_u8(b'1');
bytes.put_i32(4);
bytes
}
pub fn ready_for_query(in_transaction: bool) -> BytesMut {
let mut bytes = BytesMut::with_capacity(
mem::size_of::<u8>() + mem::size_of::<i32>() + mem::size_of::<u8>(),
);
bytes.put_u8(b'Z');
bytes.put_i32(5);
if in_transaction {
bytes.put_u8(b'T');
} else {
bytes.put_u8(b'I');
}
bytes
}
/// Write all data in the buffer to the TcpStream.
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
where
@@ -595,10 +564,12 @@ where
{
match stream.write_all(&buf).await {
Ok(_) => Ok(()),
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
}
}
@@ -609,10 +580,12 @@ where
{
match stream.write_all(buf).await {
Ok(_) => Ok(()),
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
}
}
@@ -623,15 +596,19 @@ where
match stream.write_all(buf).await {
Ok(_) => match stream.flush().await {
Ok(_) => Ok(()),
Err(err) => Err(Error::SocketError(format!(
"Error flushing socket - Error: {:?}",
err
))),
Err(err) => {
return Err(Error::SocketError(format!(
"Error flushing socket - Error: {:?}",
err
)))
}
},
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
}
}
@@ -746,71 +723,11 @@ impl BytesMutReader for Cursor<&BytesMut> {
let mut buf = vec![];
match self.read_until(b'\0', &mut buf) {
Ok(_) => Ok(String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string()),
Err(err) => Err(Error::ParseBytesError(err.to_string())),
Err(err) => return Err(Error::ParseBytesError(err.to_string())),
}
}
}
impl BytesMutReader for BytesMut {
/// Should only be used when reading strings from the message protocol.
/// Can be used to read multiple strings from the same message which are separated by the null byte
fn read_string(&mut self) -> Result<String, Error> {
let null_index = self.iter().position(|&byte| byte == b'\0');
match null_index {
Some(index) => {
let string_bytes = self.split_to(index + 1);
Ok(String::from_utf8_lossy(&string_bytes[..string_bytes.len() - 1]).to_string())
}
None => Err(Error::ParseBytesError("Could not read string".to_string())),
}
}
}
pub enum ExtendedProtocolData {
Parse {
data: BytesMut,
metadata: Option<(Arc<Parse>, u64)>,
},
Bind {
data: BytesMut,
metadata: Option<String>,
},
Describe {
data: BytesMut,
metadata: Option<String>,
},
Execute {
data: BytesMut,
},
Close {
data: BytesMut,
close: Close,
},
}
impl ExtendedProtocolData {
pub fn create_new_parse(data: BytesMut, metadata: Option<(Arc<Parse>, u64)>) -> Self {
Self::Parse { data, metadata }
}
pub fn create_new_bind(data: BytesMut, metadata: Option<String>) -> Self {
Self::Bind { data, metadata }
}
pub fn create_new_describe(data: BytesMut, metadata: Option<String>) -> Self {
Self::Describe { data, metadata }
}
pub fn create_new_execute(data: BytesMut) -> Self {
Self::Execute { data }
}
pub fn create_new_close(data: BytesMut, close: Close) -> Self {
Self::Close { data, close }
}
}
/// Parse (F) message.
/// See: <https://www.postgresql.org/docs/current/protocol-message-formats.html>
#[derive(Clone, Debug)]
@@ -819,6 +736,7 @@ pub struct Parse {
#[allow(dead_code)]
len: i32,
pub name: String,
pub generated_name: String,
query: String,
num_params: i16,
param_types: Vec<i32>,
@@ -844,6 +762,7 @@ impl TryFrom<&BytesMut> for Parse {
code,
len,
name,
generated_name: prepared_statement_name(),
query,
num_params,
param_types,
@@ -892,44 +811,11 @@ impl TryFrom<&Parse> for BytesMut {
}
impl Parse {
/// Renames the prepared statement to a new name based on the global counter
pub fn rewrite(mut self) -> Self {
self.name = format!(
"PGCAT_{}",
PREPARED_STATEMENT_COUNTER.fetch_add(1, Ordering::SeqCst)
);
pub fn rename(mut self) -> Self {
self.name = self.generated_name.to_string();
self
}
/// Gets the name of the prepared statement from the buffer
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
let mut cursor = Cursor::new(buf);
// Skip the code and length
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
cursor.read_string()
}
/// Hashes the parse statement to be used as a key in the global cache
pub fn get_hash(&self) -> u64 {
// TODO_ZAIN: Take a look at which hashing function is being used
let mut hasher = DefaultHasher::new();
let concatenated = format!(
"{}{}{}",
self.query,
self.num_params,
self.param_types
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(",")
);
concatenated.hash(&mut hasher);
hasher.finish()
}
pub fn anonymous(&self) -> bool {
self.name.is_empty()
}
@@ -1060,42 +946,9 @@ impl TryFrom<Bind> for BytesMut {
}
impl Bind {
/// Gets the name of the prepared statement from the buffer
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
let mut cursor = Cursor::new(buf);
// Skip the code and length
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
cursor.read_string()?;
cursor.read_string()
}
/// Renames the prepared statement to a new name
pub fn rename(buf: BytesMut, new_name: &str) -> Result<BytesMut, Error> {
let mut cursor = Cursor::new(&buf);
// Read basic data from the cursor
let code = cursor.get_u8();
let current_len = cursor.get_i32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
// Calculate new length
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
// Begin building the response buffer
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
response_buf.put_u8(code);
response_buf.put_i32(new_len);
// Put the portal and new name into the buffer
// Note: panic if the provided string contains null byte
response_buf.put_slice(CString::new(portal)?.as_bytes_with_nul());
response_buf.put_slice(CString::new(new_name)?.as_bytes_with_nul());
// Add the remainder of the original buffer into the response
response_buf.put_slice(&buf[cursor.position() as usize..]);
// Return the buffer
Ok(response_buf)
pub fn reassign(mut self, parse: &Parse) -> Self {
self.prepared_statement = parse.name.clone();
self
}
pub fn anonymous(&self) -> bool {
@@ -1109,7 +962,7 @@ pub struct Describe {
#[allow(dead_code)]
len: i32,
pub target: char,
target: char,
pub statement_name: String,
}
@@ -1151,15 +1004,6 @@ impl TryFrom<Describe> for BytesMut {
}
impl Describe {
pub fn empty_new() -> Describe {
Describe {
code: 'D',
len: 4 + 1 + 1,
target: 'S',
statement_name: "".to_string(),
}
}
pub fn rename(mut self, name: &str) -> Self {
self.statement_name = name.to_string();
self
@@ -1248,297 +1092,9 @@ pub fn close_complete() -> BytesMut {
bytes
}
// from https://www.postgresql.org/docs/12/protocol-error-fields.html
#[derive(Debug, Default, PartialEq)]
pub struct PgErrorMsg {
pub severity_localized: String, // S
pub severity: String, // V
pub code: String, // C
pub message: String, // M
pub detail: Option<String>, // D
pub hint: Option<String>, // H
pub position: Option<u32>, // P
pub internal_position: Option<u32>, // p
pub internal_query: Option<String>, // q
pub where_context: Option<String>, // W
pub schema_name: Option<String>, // s
pub table_name: Option<String>, // t
pub column_name: Option<String>, // c
pub data_type_name: Option<String>, // d
pub constraint_name: Option<String>, // n
pub file_name: Option<String>, // F
pub line: Option<u32>, // L
pub routine: Option<String>, // R
}
// TODO: implement with https://docs.rs/derive_more/latest/derive_more/
impl Display for PgErrorMsg {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "[severity: {}]", self.severity)?;
write!(f, "[code: {}]", self.code)?;
write!(f, "[message: {}]", self.message)?;
if let Some(val) = &self.detail {
write!(f, "[detail: {val}]")?;
}
if let Some(val) = &self.hint {
write!(f, "[hint: {val}]")?;
}
if let Some(val) = &self.position {
write!(f, "[position: {val}]")?;
}
if let Some(val) = &self.internal_position {
write!(f, "[internal_position: {val}]")?;
}
if let Some(val) = &self.internal_query {
write!(f, "[internal_query: {val}]")?;
}
if let Some(val) = &self.internal_query {
write!(f, "[internal_query: {val}]")?;
}
if let Some(val) = &self.where_context {
write!(f, "[where: {val}]")?;
}
if let Some(val) = &self.schema_name {
write!(f, "[schema_name: {val}]")?;
}
if let Some(val) = &self.table_name {
write!(f, "[table_name: {val}]")?;
}
if let Some(val) = &self.column_name {
write!(f, "[column_name: {val}]")?;
}
if let Some(val) = &self.data_type_name {
write!(f, "[data_type_name: {val}]")?;
}
if let Some(val) = &self.constraint_name {
write!(f, "[constraint_name: {val}]")?;
}
if let Some(val) = &self.file_name {
write!(f, "[file_name: {val}]")?;
}
if let Some(val) = &self.line {
write!(f, "[line: {val}]")?;
}
if let Some(val) = &self.routine {
write!(f, "[routine: {val}]")?;
}
write!(f, " ")?;
Ok(())
}
}
impl PgErrorMsg {
pub fn parse(error_msg: &[u8]) -> Result<PgErrorMsg, Error> {
let mut out = PgErrorMsg {
severity_localized: "".to_string(),
severity: "".to_string(),
code: "".to_string(),
message: "".to_string(),
detail: None,
hint: None,
position: None,
internal_position: None,
internal_query: None,
where_context: None,
schema_name: None,
table_name: None,
column_name: None,
data_type_name: None,
constraint_name: None,
file_name: None,
line: None,
routine: None,
};
for msg_part in error_msg.split(|v| *v == MESSAGE_TERMINATOR) {
if msg_part.is_empty() {
continue;
}
let msg_content = match String::from_utf8_lossy(&msg_part[1..]).parse() {
Ok(c) => c,
Err(err) => {
return Err(Error::ServerMessageParserError(format!(
"could not parse server message field. err {:?}",
err
)))
}
};
match &msg_part[0] {
b'S' => {
out.severity_localized = msg_content;
}
b'V' => {
out.severity = msg_content;
}
b'C' => {
out.code = msg_content;
}
b'M' => {
out.message = msg_content;
}
b'D' => {
out.detail = Some(msg_content);
}
b'H' => {
out.hint = Some(msg_content);
}
b'P' => out.position = Some(u32::from_str(msg_content.as_str()).unwrap_or(0)),
b'p' => {
out.internal_position = Some(u32::from_str(msg_content.as_str()).unwrap_or(0))
}
b'q' => {
out.internal_query = Some(msg_content);
}
b'W' => {
out.where_context = Some(msg_content);
}
b's' => {
out.schema_name = Some(msg_content);
}
b't' => {
out.table_name = Some(msg_content);
}
b'c' => {
out.column_name = Some(msg_content);
}
b'd' => {
out.data_type_name = Some(msg_content);
}
b'n' => {
out.constraint_name = Some(msg_content);
}
b'F' => {
out.file_name = Some(msg_content);
}
b'L' => out.line = Some(u32::from_str(msg_content.as_str()).unwrap_or(0)),
b'R' => {
out.routine = Some(msg_content);
}
_ => {}
}
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use crate::messages::PgErrorMsg;
use log::{error, info};
fn field(kind: char, content: &str) -> Vec<u8> {
format!("{kind}{content}\0").as_bytes().to_vec()
}
#[test]
fn parse_fields() {
let mut complete_msg = vec![];
let severity = "FATAL";
complete_msg.extend(field('S', severity));
complete_msg.extend(field('V', severity));
let error_code = "29P02";
complete_msg.extend(field('C', error_code));
let message = "password authentication failed for user \"wrong_user\"";
complete_msg.extend(field('M', message));
let detail_msg = "super detailed message";
complete_msg.extend(field('D', detail_msg));
let hint_msg = "hint detail here";
complete_msg.extend(field('H', hint_msg));
complete_msg.extend(field('P', "123"));
complete_msg.extend(field('p', "234"));
let internal_query = "SELECT * from foo;";
complete_msg.extend(field('q', internal_query));
let where_msg = "where goes here";
complete_msg.extend(field('W', where_msg));
let schema_msg = "schema_name";
complete_msg.extend(field('s', schema_msg));
let table_msg = "table_name";
complete_msg.extend(field('t', table_msg));
let column_msg = "column_name";
complete_msg.extend(field('c', column_msg));
let data_type_msg = "type_name";
complete_msg.extend(field('d', data_type_msg));
let constraint_msg = "constraint_name";
complete_msg.extend(field('n', constraint_msg));
let file_msg = "pgcat.c";
complete_msg.extend(field('F', file_msg));
complete_msg.extend(field('L', "335"));
let routine_msg = "my_failing_routine";
complete_msg.extend(field('R', routine_msg));
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_ansi(true)
.init();
info!(
"full message: {}",
PgErrorMsg::parse(&complete_msg).unwrap()
);
assert_eq!(
PgErrorMsg {
severity_localized: severity.to_string(),
severity: severity.to_string(),
code: error_code.to_string(),
message: message.to_string(),
detail: Some(detail_msg.to_string()),
hint: Some(hint_msg.to_string()),
position: Some(123),
internal_position: Some(234),
internal_query: Some(internal_query.to_string()),
where_context: Some(where_msg.to_string()),
schema_name: Some(schema_msg.to_string()),
table_name: Some(table_msg.to_string()),
column_name: Some(column_msg.to_string()),
data_type_name: Some(data_type_msg.to_string()),
constraint_name: Some(constraint_msg.to_string()),
file_name: Some(file_msg.to_string()),
line: Some(335),
routine: Some(routine_msg.to_string()),
},
PgErrorMsg::parse(&complete_msg).unwrap()
);
let mut only_mandatory_msg = vec![];
only_mandatory_msg.extend(field('S', severity));
only_mandatory_msg.extend(field('V', severity));
only_mandatory_msg.extend(field('C', error_code));
only_mandatory_msg.extend(field('M', message));
only_mandatory_msg.extend(field('D', detail_msg));
let err_fields = PgErrorMsg::parse(&only_mandatory_msg).unwrap();
info!("only mandatory fields: {}", &err_fields);
error!(
"server error: {}: {}",
err_fields.severity, err_fields.message
);
assert_eq!(
PgErrorMsg {
severity_localized: severity.to_string(),
severity: severity.to_string(),
code: error_code.to_string(),
message: message.to_string(),
detail: Some(detail_msg.to_string()),
hint: None,
position: None,
internal_position: None,
internal_query: None,
where_context: None,
schema_name: None,
table_name: None,
column_name: None,
data_type_name: None,
constraint_name: None,
file_name: None,
line: None,
routine: None,
},
PgErrorMsg::parse(&only_mandatory_msg).unwrap()
);
}
pub fn prepared_statement_name() -> String {
format!(
"P_{}",
PREPARED_STATEMENT_COUNTER.fetch_add(1, Ordering::SeqCst)
)
}

View File

@@ -23,15 +23,14 @@ impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout, _cfg, prepared_statement_cache_size) =
let (connection_timeout, idle_timeout, _cfg) =
match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
cfg.idle_timeout.unwrap_or(default),
cfg.clone(),
cfg.prepared_statements_cache_size,
),
None => (default, default, crate::config::Pool::default(), 0),
None => (default, default, crate::config::Pool::default()),
};
let manager = ServerPool::new(
@@ -42,8 +41,6 @@ impl MirroredClient {
Arc::new(RwLock::new(None)),
None,
true,
false,
prepared_statement_cache_size,
);
Pool::builder()
@@ -81,13 +78,12 @@ impl MirroredClient {
}
// Incoming data from server (we read to clear the socket buffer and discard the data)
recv_result = server.recv(None) => {
recv_result = server.recv() => {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
}
}
}
@@ -99,9 +95,8 @@ impl MirroredClient {
match server.send(&BytesMut::from(&bytes[..])).await {
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to receive from mirror {:?} {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
}
}
}
@@ -141,18 +136,18 @@ impl MirroringManager {
bytes_rx,
disconnect_rx: exit_rx,
};
exit_senders.push(exit_tx);
byte_senders.push(bytes_tx);
exit_senders.push(exit_tx.clone());
byte_senders.push(bytes_tx.clone());
client.start();
});
Self {
byte_senders,
byte_senders: byte_senders,
disconnect_senders: exit_senders,
}
}
pub fn send(&mut self, bytes: &BytesMut) {
pub fn send(self: &mut Self, bytes: &BytesMut) {
// We want to avoid performing an allocation if we won't be able to send the message
// There is a possibility of a race here where we check the capacity and then the channel is
// closed or the capacity is reduced to 0, but mirroring is best effort anyway
@@ -174,7 +169,7 @@ impl MirroringManager {
});
}
pub fn disconnect(&mut self) {
pub fn disconnect(self: &mut Self) {
self.disconnect_senders
.iter_mut()
.for_each(|sender| match sender.try_send(()) {

View File

@@ -92,7 +92,7 @@ impl<'a> Plugin for Intercept<'a> {
.map(|s| {
let s = s.as_str().to_string();
if s.is_empty() {
if s == "" {
None
} else {
Some(s)

View File

@@ -33,7 +33,6 @@ pub enum PluginOutput {
#[async_trait]
pub trait Plugin {
// Run before the query is sent to the server.
#[allow(clippy::ptr_arg)]
async fn run(
&mut self,
query_router: &QueryRouter,

View File

@@ -20,7 +20,7 @@ impl<'a> Prewarmer<'a> {
self.server.address(),
query
);
self.server.query(query).await?;
self.server.query(&query).await?;
}
Ok(())

View File

@@ -31,7 +31,7 @@ impl<'a> Plugin for QueryLogger<'a> {
.map(|q| q.to_string())
.collect::<Vec<String>>()
.join("; ");
info!("[pool: {}][user: {}] {}", self.db, self.user, query);
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
Ok(PluginOutput::Allow)
}

View File

@@ -34,7 +34,7 @@ impl<'a> Plugin for TableAccess<'a> {
visit_relations(ast, |relation| {
let relation = relation.to_string();
let parts = relation.split('.').collect::<Vec<&str>>();
let parts = relation.split(".").collect::<Vec<&str>>();
let table_name = parts.last().unwrap();
if self.tables.contains(&table_name.to_string()) {

View File

@@ -1,9 +1,9 @@
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection, QueueStrategy};
use bytes::{BufMut, BytesMut};
use chrono::naive::NaiveDateTime;
use log::{debug, error, info, warn};
use lru::LruCache;
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use rand::seq::SliceRandom;
@@ -11,8 +11,6 @@ use rand::thread_rng;
use regex::Regex;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicU64;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
@@ -21,14 +19,13 @@ use std::time::Instant;
use tokio::sync::Notify;
use crate::config::{
get_config, Address, DefaultShard, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
};
use crate::errors::Error;
use crate::auth_passthrough::AuthPassthrough;
use crate::messages::Parse;
use crate::plugins::prewarmer;
use crate::server::{Server, ServerParameters};
use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{AddressStats, ClientStats, ServerStats};
@@ -57,57 +54,6 @@ pub enum BanReason {
AdminBan(i64),
}
pub type PreparedStatementCacheType = Arc<Mutex<PreparedStatementCache>>;
// TODO: Add stats the this cache
// TODO: Add application name to the cache value to help identify which application is using the cache
// TODO: Create admin command to show which statements are in the cache
#[derive(Debug)]
pub struct PreparedStatementCache {
cache: LruCache<u64, Arc<Parse>>,
}
impl PreparedStatementCache {
pub fn new(mut size: usize) -> Self {
// Cannot be zeros
if size == 0 {
size = 1;
}
PreparedStatementCache {
cache: LruCache::new(NonZeroUsize::new(size).unwrap()),
}
}
/// Adds the prepared statement to the cache if it doesn't exist with a new name
/// if it already exists will give you the existing parse
///
/// Pass the hash to this so that we can do the compute before acquiring the lock
pub fn get_or_insert(&mut self, parse: &Parse, hash: u64) -> Arc<Parse> {
match self.cache.get(&hash) {
Some(rewritten_parse) => rewritten_parse.clone(),
None => {
let new_parse = Arc::new(parse.clone().rewrite());
let evicted = self.cache.push(hash, new_parse.clone());
if let Some((_, evicted_parse)) = evicted {
debug!(
"Evicted prepared statement {} from cache",
evicted_parse.name
);
}
new_parse
}
}
}
/// Marks the hash as most recently used if it exists
pub fn promote(&mut self, hash: &u64) {
self.cache.promote(hash);
}
}
/// An identifier for a PgCat pool,
/// a database visible to clients.
#[derive(Hash, Debug, Clone, PartialEq, Eq, Default)]
@@ -165,12 +111,6 @@ pub struct PoolSettings {
// Enable/disable query parser.
pub query_parser_enabled: bool,
// Max length of query the parser will parse.
pub query_parser_max_length: Option<usize>,
// Infer role
pub query_parser_read_write_splitting: bool,
// Read from the primary as well or not.
pub primary_reads_enabled: bool,
@@ -195,9 +135,6 @@ pub struct PoolSettings {
// Regex for searching for the shard id in SQL statements
pub shard_id_regex: Option<Regex>,
// What to do when no shard is selected in a sharded system
pub default_shard: DefaultShard,
// Limit how much of each query is searched for a potential shard regex match
pub regex_search_limit: usize,
@@ -220,8 +157,6 @@ impl Default for PoolSettings {
db: String::default(),
default_role: None,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
@@ -231,7 +166,6 @@ impl Default for PoolSettings {
sharding_key_regex: None,
shard_id_regex: None,
regex_search_limit: 1000,
default_shard: DefaultShard::Shard(0),
auth_query: None,
auth_query_user: None,
auth_query_password: None,
@@ -244,23 +178,23 @@ impl Default for PoolSettings {
#[derive(Clone, Debug, Default)]
pub struct ConnectionPool {
/// The pools handled internally by bb8.
databases: Arc<Vec<Vec<Pool<ServerPool>>>>,
databases: Vec<Vec<Pool<ServerPool>>>,
/// The addresses (host, port, role) to handle
/// failover and load balancing deterministically.
addresses: Arc<Vec<Vec<Address>>>,
addresses: Vec<Vec<Address>>,
/// List of banned addresses (see above)
/// that should not be queried.
banlist: BanList,
/// The server information has to be passed to the
/// The server information (K messages) have to be passed to the
/// clients on startup. We pre-connect to all shards and replicas
/// on pool creation and save the startup parameters here.
original_server_parameters: Arc<RwLock<ServerParameters>>,
/// on pool creation and save the K messages here.
server_info: Arc<RwLock<BytesMut>>,
/// Pool configuration.
pub settings: Arc<PoolSettings>,
pub settings: PoolSettings,
/// If not validated, we need to double check the pool is available before allowing a client
/// to use it.
@@ -277,9 +211,6 @@ pub struct ConnectionPool {
/// AuthInfo
pub auth_hash: Arc<RwLock<Option<String>>>,
/// Cache
pub prepared_statement_cache: Option<PreparedStatementCacheType>,
}
impl ConnectionPool {
@@ -298,17 +229,20 @@ impl ConnectionPool {
let old_pool_ref = get_pool(pool_name, &user.username);
let identifier = PoolIdentifier::new(pool_name, &user.username);
if let Some(pool) = old_pool_ref {
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
if pool.config_hash == new_pool_hash_value {
info!(
"[pool: {}][user: {}] has not changed",
pool_name, user.username
);
new_pools.insert(identifier.clone(), pool.clone());
continue;
match old_pool_ref {
Some(pool) => {
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
if pool.config_hash == new_pool_hash_value {
info!(
"[pool: {}][user: {}] has not changed",
pool_name, user.username
);
new_pools.insert(identifier.clone(), pool.clone());
continue;
}
}
None => (),
}
info!(
@@ -358,7 +292,6 @@ impl ConnectionPool {
pool_name: pool_name.clone(),
mirrors: vec![],
stats: Arc::new(AddressStats::default()),
error_count: Arc::new(AtomicU64::new(0)),
});
address_id += 1;
}
@@ -377,7 +310,6 @@ impl ConnectionPool {
pool_name: pool_name.clone(),
mirrors: mirror_addresses,
stats: Arc::new(AddressStats::default()),
error_count: Arc::new(AtomicU64::new(0)),
};
address_id += 1;
@@ -432,24 +364,16 @@ impl ConnectionPool {
None => config.plugins.clone(),
},
pool_config.cleanup_server_connections,
pool_config.log_client_parameter_status_changes,
pool_config.prepared_statements_cache_size,
);
let connect_timeout = match user.connect_timeout {
let connect_timeout = match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => config.general.connect_timeout,
},
None => config.general.connect_timeout,
};
let idle_timeout = match user.idle_timeout {
let idle_timeout = match pool_config.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => match pool_config.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => config.general.idle_timeout,
},
None => config.general.idle_timeout,
};
let server_lifetime = match user.server_lifetime {
@@ -460,7 +384,7 @@ impl ConnectionPool {
},
};
let reaper_rate = *[idle_timeout, server_lifetime, POOL_REAPER_RATE]
let reaper_rate = *vec![idle_timeout, server_lifetime, POOL_REAPER_RATE]
.iter()
.min()
.unwrap();
@@ -509,13 +433,13 @@ impl ConnectionPool {
}
let pool = ConnectionPool {
databases: Arc::new(shards),
addresses: Arc::new(addresses),
databases: shards,
addresses,
banlist: Arc::new(RwLock::new(banlist)),
config_hash: new_pool_hash_value,
original_server_parameters: Arc::new(RwLock::new(ServerParameters::new())),
server_info: Arc::new(RwLock::new(BytesMut::new())),
auth_hash: pool_auth_hash,
settings: Arc::new(PoolSettings {
settings: PoolSettings {
pool_mode: match user.pool_mode {
Some(pool_mode) => pool_mode,
None => pool_config.pool_mode,
@@ -532,9 +456,6 @@ impl ConnectionPool {
_ => unreachable!(),
},
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
.query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
@@ -550,7 +471,6 @@ impl ConnectionPool {
.clone()
.map(|regex| Regex::new(regex.as_str()).unwrap()),
regex_search_limit: pool_config.regex_search_limit.unwrap_or(1000),
default_shard: pool_config.default_shard,
auth_query: pool_config.auth_query.clone(),
auth_query_user: pool_config.auth_query_user.clone(),
auth_query_password: pool_config.auth_query_password.clone(),
@@ -558,23 +478,17 @@ impl ConnectionPool {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
}),
},
validated: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
paused_waiter: Arc::new(Notify::new()),
prepared_statement_cache: match pool_config.prepared_statements_cache_size {
0 => None,
_ => Some(Arc::new(Mutex::new(PreparedStatementCache::new(
pool_config.prepared_statements_cache_size,
)))),
},
};
// Connect to the servers to make sure pool configuration is valid
// before setting it globally.
// Do this async and somewhere else, we don't have to wait here.
if config.general.validate_config {
let validate_pool = pool.clone();
let mut validate_pool = pool.clone();
tokio::task::spawn(async move {
let _ = validate_pool.validate().await;
});
@@ -595,7 +509,7 @@ impl ConnectionPool {
/// when they connect.
/// This also warms up the pool for clients that connect when
/// the pooler starts up.
pub async fn validate(&self) -> Result<(), Error> {
pub async fn validate(&mut self) -> Result<(), Error> {
let mut futures = Vec::new();
let validated = Arc::clone(&self.validated);
@@ -603,7 +517,7 @@ impl ConnectionPool {
for server in 0..self.servers(shard) {
let databases = self.databases.clone();
let validated = Arc::clone(&validated);
let pool_server_parameters = Arc::clone(&self.original_server_parameters);
let pool_server_info = Arc::clone(&self.server_info);
let task = tokio::task::spawn(async move {
let connection = match databases[shard][server].get().await {
@@ -616,10 +530,11 @@ impl ConnectionPool {
let proxy = connection;
let server = &*proxy;
let server_parameters: ServerParameters = server.server_parameters();
let server_info = server.server_info();
let mut guard = pool_server_parameters.write();
*guard = server_parameters;
let mut guard = pool_server_info.write();
guard.clear();
guard.put(server_info.clone());
validated.store(true, Ordering::Relaxed);
});
@@ -631,7 +546,7 @@ impl ConnectionPool {
// TODO: compare server information to make sure
// all shards are running identical configurations.
if !self.validated() {
if self.server_info.read().is_empty() {
error!("Could not validate connection pool");
return Err(Error::AllServersDown);
}
@@ -678,51 +593,19 @@ impl ConnectionPool {
/// Get a connection from the pool.
pub async fn get(
&self,
shard: Option<usize>, // shard number
shard: usize, // shard number
role: Option<Role>, // primary or replica
client_stats: &ClientStats, // client id
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let effective_shard_id = if self.shards() == 1 {
// The base, unsharded case
Some(0)
} else {
if !self.valid_shard_id(shard) {
// None is valid shard ID so it is safe to unwrap here
return Err(Error::InvalidShardId(shard.unwrap()));
}
shard
};
let mut candidates = self
.addresses
let mut candidates: Vec<&Address> = self.addresses[shard]
.iter()
.flatten()
.filter(|address| address.role == role)
.collect::<Vec<&Address>>();
.collect();
// We start with a shuffled list of addresses even if we end up resorting
// this is meant to avoid hitting instance 0 everytime if the sorting metric
// ends up being the same for all instances
// We shuffle even if least_outstanding_queries is used to avoid imbalance
// in cases where all candidates have more or less the same number of outstanding
// queries
candidates.shuffle(&mut thread_rng());
match effective_shard_id {
Some(shard_id) => candidates.retain(|address| address.shard == shard_id),
None => match self.settings.default_shard {
DefaultShard::Shard(shard_id) => {
candidates.retain(|address| address.shard == shard_id)
}
DefaultShard::Random => (),
DefaultShard::RandomHealthy => {
candidates.sort_by(|a, b| {
b.error_count
.load(Ordering::Relaxed)
.partial_cmp(&a.error_count.load(Ordering::Relaxed))
.unwrap()
});
}
},
};
if self.settings.load_balancing_mode == LoadBalancingMode::LeastOutstandingConnections {
candidates.sort_by(|a, b| {
self.busy_connection_count(b)
@@ -745,7 +628,7 @@ impl ConnectionPool {
let mut force_healthcheck = false;
if self.is_banned(address) {
if self.try_unban(address).await {
if self.try_unban(&address).await {
force_healthcheck = true;
} else {
debug!("Address {:?} is banned", address);
@@ -758,10 +641,7 @@ impl ConnectionPool {
.get()
.await
{
Ok(conn) => {
address.reset_error_count();
conn
}
Ok(conn) => conn,
Err(err) => {
error!(
"Connection checkout error for instance {:?}, error: {:?}",
@@ -787,7 +667,7 @@ impl ConnectionPool {
// since we last checked the server is ok.
// Health checks are pretty expensive.
if !require_healthcheck {
let checkout_time = now.elapsed().as_micros() as u64;
let checkout_time: u64 = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
server
.stats()
@@ -801,7 +681,7 @@ impl ConnectionPool {
.run_health_check(address, server, now, client_stats)
.await
{
let checkout_time = now.elapsed().as_micros() as u64;
let checkout_time: u64 = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
server
.stats()
@@ -813,12 +693,7 @@ impl ConnectionPool {
continue;
}
}
client_stats.idle();
let checkout_time = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
Err(Error::AllServersDown)
}
@@ -871,28 +746,16 @@ impl ConnectionPool {
}
// Don't leave a bad connection in the pool.
server.mark_bad("failed health check");
server.mark_bad();
self.ban(address, BanReason::FailedHealthCheck, Some(client_info));
false
self.ban(&address, BanReason::FailedHealthCheck, Some(client_info));
return false;
}
/// Ban an address (i.e. replica). It no longer will serve
/// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address, reason: BanReason, client_info: Option<&ClientStats>) {
// Count the number of errors since the last successful checkout
// This is used to determine if the shard is down
match reason {
BanReason::FailedHealthCheck
| BanReason::FailedCheckout
| BanReason::MessageSendFailed
| BanReason::MessageReceiveFailed => {
address.increment_error_count();
}
_ => (),
};
// Primary can never be banned
if address.role == Role::Primary {
return;
@@ -998,10 +861,10 @@ impl ConnectionPool {
let guard = self.banlist.read();
for banlist in guard.iter() {
for (address, (reason, timestamp)) in banlist.iter() {
bans.push((address.clone(), (reason.clone(), *timestamp)));
bans.push((address.clone(), (reason.clone(), timestamp.clone())));
}
}
bans
return bans;
}
/// Get the address from the host url
@@ -1043,11 +906,10 @@ impl ConnectionPool {
&self.addresses[shard][server]
}
pub fn server_parameters(&self) -> ServerParameters {
self.original_server_parameters.read().clone()
pub fn server_info(&self) -> BytesMut {
self.server_info.read().clone()
}
/// Get the number of checked out connection for an address
fn busy_connection_count(&self, address: &Address) -> u32 {
let state = self.pool_state(address.shard, address.address_index);
let idle = state.idle_connections;
@@ -1059,37 +921,7 @@ impl ConnectionPool {
}
let busy = provisioned - idle;
debug!("{:?} has {:?} busy connections", address, busy);
busy
}
fn valid_shard_id(&self, shard: Option<usize>) -> bool {
match shard {
None => true,
Some(shard) => shard < self.shards(),
}
}
/// Register a parse statement to the pool's cache and return the rewritten parse
///
/// Do not pass an anonymous parse statement to this function
pub fn register_parse_to_cache(&self, hash: u64, parse: &Parse) -> Option<Arc<Parse>> {
// We should only be calling this function if the cache is enabled
match self.prepared_statement_cache {
Some(ref prepared_statement_cache) => {
let mut cache = prepared_statement_cache.lock();
Some(cache.get_or_insert(parse, hash))
}
None => None,
}
}
/// Promote a prepared statement hash in the LRU
pub fn promote_prepared_statement_hash(&self, hash: &u64) {
// We should only be calling this function if the cache is enabled
if let Some(ref prepared_statement_cache) = self.prepared_statement_cache {
let mut cache = prepared_statement_cache.lock();
cache.promote(hash);
}
return busy;
}
}
@@ -1115,16 +947,9 @@ pub struct ServerPool {
/// Should we clean up dirty connections before putting them into the pool?
cleanup_connections: bool,
/// Log client parameter status changes
log_client_parameter_status_changes: bool,
/// Prepared statement cache size
prepared_statement_cache_size: usize,
}
impl ServerPool {
#[allow(clippy::too_many_arguments)]
pub fn new(
address: Address,
user: User,
@@ -1133,19 +958,15 @@ impl ServerPool {
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
cleanup_connections: bool,
log_client_parameter_status_changes: bool,
prepared_statement_cache_size: usize,
) -> ServerPool {
ServerPool {
address,
user,
user: user.clone(),
database: database.to_string(),
client_server_map,
auth_hash,
plugins,
cleanup_connections,
log_client_parameter_status_changes,
prepared_statement_cache_size,
}
}
}
@@ -1175,8 +996,6 @@ impl ManageConnection for ServerPool {
stats.clone(),
self.auth_hash.clone(),
self.cleanup_connections,
self.log_client_parameter_status_changes,
self.prepared_statement_cache_size,
)
.await
{

File diff suppressed because it is too large Load Diff

View File

@@ -79,12 +79,12 @@ impl ScramSha256 {
let server_message = Message::parse(message)?;
if !server_message.nonce.starts_with(&self.nonce) {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
let salt = match general_purpose::STANDARD.decode(&server_message.salt) {
Ok(salt) => salt,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
let salted_password = Self::hi(
@@ -166,9 +166,9 @@ impl ScramSha256 {
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
let final_message = FinalMessage::parse(message)?;
let verifier = match general_purpose::STANDARD.decode(final_message.value) {
let verifier = match general_purpose::STANDARD.decode(&final_message.value) {
Ok(verifier) => verifier,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
let mut hmac = match Hmac::<Sha256>::new_from_slice(&self.salted_password) {
@@ -230,14 +230,14 @@ impl Message {
.collect::<Vec<String>>();
if parts.len() != 3 {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
let nonce = str::replace(&parts[0], "r=", "");
let salt = str::replace(&parts[1], "s=", "");
let iterations = match str::replace(&parts[2], "i=", "").parse::<u32>() {
Ok(iterations) => iterations,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
Ok(Message {
@@ -257,7 +257,7 @@ impl FinalMessage {
/// Parse the server final validation message.
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
if !message.starts_with(b"v=") || message.len() < 4 {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
Ok(FinalMessage {

View File

@@ -3,14 +3,11 @@
use bytes::{Buf, BufMut, BytesMut};
use fallible_iterator::FallibleIterator;
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::collections::{BTreeSet, HashMap};
use std::io::Read;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
@@ -18,11 +15,10 @@ use tokio::net::TcpStream;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{get_config, Address, User};
use crate::config::{get_config, get_prepared_statements_cache_size, Address, User};
use crate::constants::*;
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
use crate::errors::{Error, ServerIdentifier};
use crate::messages::BytesMutReader;
use crate::messages::*;
use crate::mirrors::MirroringManager;
use crate::pool::ClientServerMap;
@@ -109,10 +105,10 @@ impl StreamInner {
#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires RESET ALL before checkin because of set statement
/// If server connection requires DISCARD ALL before checkin because of set statement
needs_cleanup_set: bool,
/// If server connection requires DEALLOCATE ALL before checkin because of prepare statement
/// If server connection requires DISCARD ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}
@@ -149,120 +145,6 @@ impl std::fmt::Display for CleanupState {
}
}
static TRACKED_PARAMETERS: Lazy<HashSet<String>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert("client_encoding".to_string());
set.insert("DateStyle".to_string());
set.insert("TimeZone".to_string());
set.insert("standard_conforming_strings".to_string());
set.insert("application_name".to_string());
set
});
#[derive(Debug, Clone)]
pub struct ServerParameters {
parameters: HashMap<String, String>,
}
impl Default for ServerParameters {
fn default() -> Self {
Self::new()
}
}
impl ServerParameters {
pub fn new() -> Self {
let mut server_parameters = ServerParameters {
parameters: HashMap::new(),
};
server_parameters.set_param("client_encoding".to_string(), "UTF8".to_string(), false);
server_parameters.set_param("DateStyle".to_string(), "ISO, MDY".to_string(), false);
server_parameters.set_param("TimeZone".to_string(), "Etc/UTC".to_string(), false);
server_parameters.set_param(
"standard_conforming_strings".to_string(),
"on".to_string(),
false,
);
server_parameters.set_param("application_name".to_string(), "pgcat".to_string(), false);
server_parameters
}
/// returns true if a tracked parameter was set, false if it was a non-tracked parameter
/// if startup is false, then then only tracked parameters will be set
pub fn set_param(&mut self, mut key: String, value: String, startup: bool) {
// The startup parameter will send uncapitalized keys but parameter status packets will send capitalized keys
if key == "timezone" {
key = "TimeZone".to_string();
} else if key == "datestyle" {
key = "DateStyle".to_string();
};
if TRACKED_PARAMETERS.contains(&key) || startup {
self.parameters.insert(key, value);
}
}
pub fn set_from_hashmap(&mut self, parameters: &HashMap<String, String>, startup: bool) {
// iterate through each and call set_param
for (key, value) in parameters {
self.set_param(key.to_string(), value.to_string(), startup);
}
}
// Gets the diff of the parameters
fn compare_params(&self, incoming_parameters: &ServerParameters) -> HashMap<String, String> {
let mut diff = HashMap::new();
// iterate through tracked parameters
for key in TRACKED_PARAMETERS.iter() {
if let Some(incoming_value) = incoming_parameters.parameters.get(key) {
if let Some(value) = self.parameters.get(key) {
if value != incoming_value {
diff.insert(key.to_string(), incoming_value.to_string());
}
}
}
}
diff
}
pub fn get_application_name(&self) -> &String {
// Can unwrap because we set it in the constructor
self.parameters.get("application_name").unwrap()
}
fn add_parameter_message(key: &str, value: &str, buffer: &mut BytesMut) {
buffer.put_u8(b'S');
// 4 is len of i32, the plus for the null terminator
let len = 4 + key.len() + 1 + value.len() + 1;
buffer.put_i32(len as i32);
buffer.put_slice(key.as_bytes());
buffer.put_u8(0);
buffer.put_slice(value.as_bytes());
buffer.put_u8(0);
}
}
impl From<&ServerParameters> for BytesMut {
fn from(server_parameters: &ServerParameters) -> Self {
let mut bytes = BytesMut::new();
for (key, value) in &server_parameters.parameters {
ServerParameters::add_parameter_message(key, value, &mut bytes);
}
bytes
}
}
// pub fn compare
/// Server state.
pub struct Server {
/// Server host, e.g. localhost,
@@ -276,7 +158,7 @@ pub struct Server {
buffer: BytesMut,
/// Server information the server sent us over on startup.
server_parameters: ServerParameters,
server_info: BytesMut,
/// Backend id and secret key used for query cancellation.
process_id: i32,
@@ -294,7 +176,7 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
/// If server connection requires reset statements before checkin
/// If server connection requires DISCARD ALL before checkin
cleanup_state: CleanupState,
/// Mapping of clients and servers used for query cancellation.
@@ -320,20 +202,13 @@ pub struct Server {
/// Should clean up dirty connections?
cleanup_connections: bool,
/// Log client parameter status changes
log_client_parameter_status_changes: bool,
/// Prepared statements
prepared_statement_cache: Option<LruCache<String, ()>>,
/// Prepared statement being currently registered on the server.
registering_prepared_statement: VecDeque<String>,
prepared_statements: BTreeSet<String>,
}
impl Server {
/// Pretend to be the Postgres client and connect to the server given host, port and credentials.
/// Perform the authentication and return the server in a ready for query state.
#[allow(clippy::too_many_arguments)]
pub async fn startup(
address: &Address,
user: &User,
@@ -342,8 +217,6 @@ impl Server {
stats: Arc<ServerStats>,
auth_hash: Arc<RwLock<Option<String>>>,
cleanup_connections: bool,
log_client_parameter_status_changes: bool,
prepared_statement_cache_size: usize,
) -> Result<Server, Error> {
let cached_resolver = CACHED_RESOLVER.load();
let mut addr_set: Option<AddrSet> = None;
@@ -443,7 +316,10 @@ impl Server {
// Something else?
m => {
return Err(Error::SocketError(format!("Unknown message: {}", { m })));
return Err(Error::SocketError(format!(
"Unknown message: {}",
m as char
)));
}
}
} else {
@@ -461,22 +337,27 @@ impl Server {
None => &user.username,
};
let password = match user.server_password.as_ref() {
Some(server_password) => Some(server_password),
None => user.password.as_ref(),
let password = match user.server_password {
Some(ref server_password) => Some(server_password),
None => match user.password {
Some(ref password) => Some(password),
None => None,
},
};
startup(&mut stream, username, database).await?;
let mut server_info = BytesMut::new();
let mut process_id: i32 = 0;
let mut secret_key: i32 = 0;
let server_identifier = ServerIdentifier::new(username, database);
let server_identifier = ServerIdentifier::new(username, &database);
// We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete.
let mut scram: Option<ScramSha256> = password.map(|password| ScramSha256::new(password));
let mut server_parameters = ServerParameters::new();
let mut scram: Option<ScramSha256> = match password {
Some(password) => Some(ScramSha256::new(password)),
None => None,
};
loop {
let code = match stream.read_u8().await {
@@ -707,7 +588,8 @@ impl Server {
// An error message will be present.
_ => {
let mut error = vec![0u8; len as usize];
// Read the error message without the terminating null character.
let mut error = vec![0u8; len as usize - 4 - 1];
match stream.read_exact(&mut error).await {
Ok(_) => (),
@@ -719,14 +601,10 @@ impl Server {
}
};
let fields = match PgErrorMsg::parse(&error) {
Ok(f) => f,
Err(err) => {
return Err(err);
}
};
trace!("error fields: {}", &fields);
error!("server error: {}: {}", fields.severity, fields.message);
// TODO: the error message contains multiple fields; we can decode them and
// present a prettier message to the user.
// See: https://www.postgresql.org/docs/12/protocol-error-fields.html
error!("Server error: {}", String::from_utf8_lossy(&error));
}
};
@@ -735,10 +613,9 @@ impl Server {
// ParameterStatus
'S' => {
let mut bytes = BytesMut::with_capacity(len as usize - 4);
bytes.resize(len as usize - mem::size_of::<i32>(), b'0');
let mut param = vec![0u8; len as usize - 4];
match stream.read_exact(&mut bytes[..]).await {
match stream.read_exact(&mut param).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ServerStartupError(
@@ -748,13 +625,12 @@ impl Server {
}
};
let key = bytes.read_string().unwrap();
let value = bytes.read_string().unwrap();
// Save the parameter so we can pass it to the client later.
// These can be server_encoding, client_encoding, server timezone, Postgres version,
// and many more interesting things we should know about the Postgres server we are talking to.
server_parameters.set_param(key, value, true);
server_info.put_u8(b'S');
server_info.put_i32(len);
server_info.put_slice(&param[..]);
}
// BackendKeyData
@@ -796,11 +672,11 @@ impl Server {
}
};
let server = Server {
let mut server = Server {
address: address.clone(),
stream: BufStream::new(stream),
buffer: BytesMut::with_capacity(8196),
server_parameters,
server_info,
process_id,
secret_key,
in_transaction: false,
@@ -812,7 +688,7 @@ impl Server {
addr_set,
connected_at: chrono::offset::Utc::now().naive_utc(),
stats,
application_name: "pgcat".to_string(),
application_name: String::new(),
last_activity: SystemTime::now(),
mirror_manager: match address.mirrors.len() {
0 => None,
@@ -823,16 +699,11 @@ impl Server {
)),
},
cleanup_connections,
log_client_parameter_status_changes,
prepared_statement_cache: match prepared_statement_cache_size {
0 => None,
_ => Some(LruCache::new(
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
)),
},
registering_prepared_statement: VecDeque::new(),
prepared_statements: BTreeSet::new(),
};
server.set_name("pgcat").await?;
return Ok(server);
}
@@ -882,7 +753,7 @@ impl Server {
self.mirror_send(messages);
self.stats().data_sent(messages.len());
match write_all_flush(&mut self.stream, messages).await {
match write_all_flush(&mut self.stream, &messages).await {
Ok(_) => {
// Successfully sent to server
self.last_activity = SystemTime::now();
@@ -902,10 +773,7 @@ impl Server {
/// Receive data from the server in response to a client request.
/// This method must be called multiple times while `self.is_data_available()` is true
/// in order to receive all data the server has to offer.
pub async fn recv(
&mut self,
mut client_server_parameters: Option<&mut ServerParameters>,
) -> Result<BytesMut, Error> {
pub async fn recv(&mut self) -> Result<BytesMut, Error> {
loop {
let mut message = match read_message(&mut self.stream).await {
Ok(message) => message,
@@ -960,6 +828,7 @@ impl Server {
// There is no more data available from the server.
self.data_available = false;
break;
}
@@ -968,37 +837,6 @@ impl Server {
if self.in_copy_mode {
self.in_copy_mode = false;
}
// Remove the prepared statement from the cache, it has a syntax error or something else bad happened.
if let Some(prepared_stmt_name) =
self.registering_prepared_statement.pop_front()
{
if let Some(ref mut cache) = self.prepared_statement_cache {
if let Some(_removed) = cache.pop(&prepared_stmt_name) {
debug!(
"Removed {} from prepared statement cache",
prepared_stmt_name
);
} else {
// Shouldn't happen.
debug!("Prepared statement {} was not cached", prepared_stmt_name);
}
}
}
if self.prepared_statement_cache.is_some() {
let error_message = PgErrorMsg::parse(&message)?;
if error_message.message == "cached plan must not change result type" {
warn!("Server {:?} changed schema, dropping connection to clean up prepared statements", self.address);
// This will still result in an error to the client, but this server connection will drop all cached prepared statements
// so that any new queries will be re-prepared
// TODO: Other ideas to solve errors when there are DDL changes after a statement has been prepared
// - Recreate entire connection pool to force recreation of all server connections
// - Clear the ConnectionPool's statement cache so that new statement names are generated
// - Implement a retry (re-prepare) so the client doesn't see an error
self.cleanup_state.needs_cleanup_prepare = true;
}
}
}
// CommandComplete
@@ -1007,24 +845,24 @@ impl Server {
self.in_copy_mode = false;
}
match message.read_string() {
Ok(command) => {
let mut command_tag = String::new();
match message.reader().read_to_string(&mut command_tag) {
Ok(_) => {
// Non-exhaustive list of commands that are likely to change session variables/resources
// which can leak between clients. This is a best effort to block bad clients
// from poisoning a transaction-mode pool by setting inappropriate session variables
match command.as_str() {
"SET" => {
match command_tag.as_str() {
"SET\0" => {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
// As a result, we will miss cases when set statements are used in transactions
// This will reduce amount of reset statements sent
// This will reduce amount of discard statements sent
if !self.in_transaction {
debug!("Server connection marked for clean up");
self.cleanup_state.needs_cleanup_set = true;
}
}
"PREPARE" => {
"PREPARE\0" => {
debug!("Server connection marked for clean up");
self.cleanup_state.needs_cleanup_prepare = true;
}
@@ -1038,20 +876,6 @@ impl Server {
}
}
'S' => {
let key = message.read_string().unwrap();
let value = message.read_string().unwrap();
if let Some(client_server_parameters) = client_server_parameters.as_mut() {
client_server_parameters.set_param(key.clone(), value.clone(), false);
if self.log_client_parameter_status_changes {
info!("Client parameter status change: {} = {}", key, value)
}
}
self.server_parameters.set_param(key, value, false);
}
// DataRow
'D' => {
// More data is available after this message, this is not the end of the reply.
@@ -1088,11 +912,6 @@ impl Server {
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
'c' => (),
// Parse complete successfully
'1' => {
self.registering_prepared_statement.pop_front();
}
// Anything else, e.g. errors, notices, etc.
// Keep buffering until ReadyForQuery shows up.
_ => (),
@@ -1114,103 +933,117 @@ impl Server {
Ok(bytes)
}
// Determines if the server already has a prepared statement with the given name
// Increments the prepared statement cache hit counter
pub fn has_prepared_statement(&mut self, name: &str) -> bool {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return false,
};
/// Add the prepared statement to being tracked by this server.
/// The client is processing data that will create a prepared statement on this server.
pub fn will_prepare(&mut self, name: &str) {
debug!("Will prepare `{}`", name);
let has_it = cache.get(name).is_some();
if has_it {
self.stats.prepared_cache_hit();
} else {
self.stats.prepared_cache_miss();
}
has_it
self.prepared_statements.insert(name.to_string());
self.stats.prepared_cache_add();
}
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return None,
};
/// Check if we should prepare a statement on the server.
pub fn should_prepare(&self, name: &str) -> bool {
let should_prepare = !self.prepared_statements.contains(name);
debug!("Should prepare `{}`: {}", name, should_prepare);
if should_prepare {
self.stats.prepared_cache_miss();
} else {
self.stats.prepared_cache_hit();
}
should_prepare
}
/// Create a prepared statement on the server.
pub async fn prepare(&mut self, parse: &Parse) -> Result<(), Error> {
debug!("Preparing `{}`", parse.name);
let bytes: BytesMut = parse.try_into()?;
self.send(&bytes).await?;
self.send(&flush()).await?;
// Read and discard ParseComplete (B)
match read_message(&mut self.stream).await {
Ok(_) => (),
Err(err) => {
self.bad = true;
return Err(err);
}
}
self.prepared_statements.insert(parse.name.to_string());
self.stats.prepared_cache_add();
// If we evict something, we need to close it on the server
if let Some((evicted_name, _)) = cache.push(name.to_string(), ()) {
if evicted_name != name {
debug!(
"Evicted prepared statement {} from cache, replaced with {}",
evicted_name, name
);
return Some(evicted_name);
}
};
debug!("Prepared `{}`", parse.name);
None
Ok(())
}
fn remove_prepared_statement_from_cache(&mut self, name: &str) {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return,
};
/// Maintain adequate cache size on the server.
pub async fn maintain_cache(&mut self) -> Result<(), Error> {
debug!("Cache maintenance run");
self.stats.prepared_cache_remove();
cache.pop(name);
}
let max_cache_size = get_prepared_statements_cache_size();
let mut names = Vec::new();
pub async fn register_prepared_statement(
&mut self,
parse: &Parse,
should_send_parse_to_server: bool,
) -> Result<(), Error> {
if !self.has_prepared_statement(&parse.name) {
self.registering_prepared_statement
.push_back(parse.name.clone());
let mut bytes = BytesMut::new();
if should_send_parse_to_server {
let parse_bytes: BytesMut = parse.try_into()?;
bytes.extend_from_slice(&parse_bytes);
while self.prepared_statements.len() >= max_cache_size {
// The prepared statmeents are alphanumerically sorted by the BTree.
// FIFO.
if let Some(name) = self.prepared_statements.pop_last() {
names.push(name);
}
// If we evict something, we need to close it on the server
// We do this by adding it to the messages we're sending to the server before the sync
if let Some(evicted_name) = self.add_prepared_statement_to_cache(&parse.name) {
self.remove_prepared_statement_from_cache(&evicted_name);
let close_bytes: BytesMut = Close::new(&evicted_name).try_into()?;
bytes.extend_from_slice(&close_bytes);
};
// If we have a parse or close we need to send to the server, send them and sync
if !bytes.is_empty() {
bytes.extend_from_slice(&sync());
self.send(&bytes).await?;
loop {
self.recv(None).await?;
if !self.is_data_available() {
break;
}
}
}
};
// If it's not there, something went bad, I'm guessing bad syntax or permissions error
// on the server.
if !self.has_prepared_statement(&parse.name) {
Err(Error::PreparedStatementError)
} else {
Ok(())
}
if !names.is_empty() {
self.deallocate(names).await?;
}
Ok(())
}
/// Remove the prepared statement from being tracked by this server.
/// The client is processing data that will cause the server to close the prepared statement.
pub fn will_close(&mut self, name: &str) {
debug!("Will close `{}`", name);
self.prepared_statements.remove(name);
}
/// Close a prepared statement on the server.
pub async fn deallocate(&mut self, names: Vec<String>) -> Result<(), Error> {
for name in &names {
debug!("Deallocating prepared statement `{}`", name);
let close = Close::new(name);
let bytes: BytesMut = close.try_into()?;
self.send(&bytes).await?;
}
if !names.is_empty() {
self.send(&flush()).await?;
}
// Read and discard CloseComplete (3)
for name in &names {
match read_message(&mut self.stream).await {
Ok(_) => {
self.prepared_statements.remove(name);
self.stats.prepared_cache_remove();
debug!("Closed `{}`", name);
}
Err(err) => {
self.bad = true;
return Err(err);
}
};
}
Ok(())
}
/// If the server is still inside a transaction.
@@ -1220,7 +1053,6 @@ impl Server {
self.in_transaction
}
/// Currently copying data from client to server or vice-versa.
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode
}
@@ -1254,33 +1086,14 @@ impl Server {
}
/// Get server startup information to forward it to the client.
pub fn server_parameters(&self) -> ServerParameters {
self.server_parameters.clone()
}
pub async fn sync_parameters(&mut self, parameters: &ServerParameters) -> Result<(), Error> {
let parameter_diff = self.server_parameters.compare_params(parameters);
if parameter_diff.is_empty() {
return Ok(());
}
let mut query = String::from("");
for (key, value) in parameter_diff {
query.push_str(&format!("SET {} TO '{}';", key, value));
}
let res = self.query(&query).await;
self.cleanup_state.reset();
res
/// Not used at the moment.
pub fn server_info(&self) -> BytesMut {
self.server_info.clone()
}
/// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self, reason: &str) {
error!("Server {:?} marked bad, reason: {}", self.address, reason);
pub fn mark_bad(&mut self) {
error!("Server {:?} marked bad", self.address);
self.bad = true;
}
@@ -1309,7 +1122,7 @@ impl Server {
self.send(&query).await?;
loop {
let _ = self.recv(None).await?;
let _ = self.recv().await?;
if !self.data_available {
break;
@@ -1334,25 +1147,12 @@ impl Server {
// Client disconnected but it performed session-altering operations such as
// SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only
// send `RESET ALL` if we think the session is altered instead of just sending
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
let mut reset_string = String::from("RESET ROLE;");
if self.cleanup_state.needs_cleanup_set {
reset_string.push_str("RESET ALL;");
};
if self.cleanup_state.needs_cleanup_prepare {
reset_string.push_str("DEALLOCATE ALL;");
// Since we deallocated all prepared statements, we need to clear the cache
if let Some(cache) = &mut self.prepared_statement_cache {
cache.clear();
}
};
self.query(&reset_string).await?;
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
self.cleanup_state.reset();
}
@@ -1363,6 +1163,24 @@ impl Server {
Ok(())
}
/// A shorthand for `SET application_name = $1`.
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
if self.application_name != name {
self.application_name = name.to_string();
// We don't want `SET application_name` to mark the server connection
// as needing cleanup
let needs_cleanup_before = self.cleanup_state;
let result = Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?);
self.cleanup_state = needs_cleanup_before;
result
} else {
Ok(())
}
}
/// get Server stats
pub fn stats(&self) -> Arc<ServerStats> {
self.stats.clone()
@@ -1379,20 +1197,22 @@ impl Server {
self.last_activity
}
// Marks a connection as needing cleanup at checkin
// Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty(&mut self) {
self.cleanup_state.set_true();
}
pub fn mirror_send(&mut self, bytes: &BytesMut) {
if let Some(manager) = self.mirror_manager.as_mut() {
manager.send(bytes)
match self.mirror_manager.as_mut() {
Some(manager) => manager.send(bytes),
None => (),
}
}
pub fn mirror_disconnect(&mut self) {
if let Some(manager) = self.mirror_manager.as_mut() {
manager.disconnect()
match self.mirror_manager.as_mut() {
Some(manager) => manager.disconnect(),
None => (),
}
}
@@ -1414,15 +1234,13 @@ impl Server {
Arc::new(ServerStats::default()),
Arc::new(RwLock::new(None)),
true,
false,
0,
)
.await?;
debug!("Connected!, sending query.");
server.send(&simple_query(query)).await?;
let mut message = server.recv(None).await?;
let mut message = server.recv().await?;
parse_query_message(&mut message).await
Ok(parse_query_message(&mut message).await?)
}
}

View File

@@ -64,7 +64,7 @@ impl Sharder {
fn sha1(&self, key: i64) -> usize {
let mut hasher = Sha1::new();
hasher.update(key.to_string().as_bytes());
hasher.update(&key.to_string().as_bytes());
let result = hasher.finalize();
@@ -202,10 +202,10 @@ mod test {
#[test]
fn test_sha1_hash() {
let sharder = Sharder::new(12, ShardingFunction::Sha1);
let ids = [
let ids = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let shards = [
let shards = vec![
4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
];

View File

@@ -86,11 +86,11 @@ impl PoolStats {
}
}
map
return map;
}
pub fn generate_header() -> Vec<(&'static str, DataType)> {
vec![
return vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
@@ -105,11 +105,11 @@ impl PoolStats {
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
]
];
}
pub fn generate_row(&self) -> Vec<String> {
vec![
return vec![
self.identifier.db.clone(),
self.identifier.user.clone(),
self.mode.to_string(),
@@ -124,7 +124,7 @@ impl PoolStats {
self.sv_login.to_string(),
(self.maxwait / 1_000_000).to_string(),
(self.maxwait % 1_000_000).to_string(),
]
];
}
}

View File

@@ -49,7 +49,6 @@ pub struct ServerStats {
pub error_count: Arc<AtomicU64>,
pub prepared_hit_count: Arc<AtomicU64>,
pub prepared_miss_count: Arc<AtomicU64>,
pub prepared_eviction_count: Arc<AtomicU64>,
pub prepared_cache_size: Arc<AtomicU64>,
}
@@ -69,7 +68,6 @@ impl Default for ServerStats {
reporter: get_reporter(),
prepared_hit_count: Arc::new(AtomicU64::new(0)),
prepared_miss_count: Arc::new(AtomicU64::new(0)),
prepared_eviction_count: Arc::new(AtomicU64::new(0)),
prepared_cache_size: Arc::new(AtomicU64::new(0)),
}
}
@@ -223,7 +221,6 @@ impl ServerStats {
}
pub fn prepared_cache_remove(&self) {
self.prepared_eviction_count.fetch_add(1, Ordering::Relaxed);
self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed);
}
}

View File

@@ -1,13 +1,8 @@
FROM rust:bullseye
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
RUN cargo install cargo-binutils rustfilt
RUN rustup component add llvm-tools-preview
RUN sudo gem install bundler
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i toxiproxy-2.4.0.deb
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -1,5 +0,0 @@
module pgcat
go 1.21
require github.com/lib/pq v1.10.9

View File

@@ -1,2 +0,0 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

View File

@@ -1,162 +0,0 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = "${PORT}"
# Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 1000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# If we should log client connections
log_client_connections = false
# If we should log client disconnections
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = 15000
server_round_robin = false
# TLS
tls_certificate = "../../.circleci/server.cert"
tls_private_key = "../../.circleci/server.key"
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username = "admin_user"
admin_password = "admin_pass"
# pool
# configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
[pools.sharded_db]
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"
# Prepared statements cache size.
prepared_statements_cache_size = 500
# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
# Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 5
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000
# Shard 0
[pools.sharded_db.shards.0]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
# Database name (e.g. "postgres")
database = "shard0"
[pools.sharded_db.shards.1]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard1"
[pools.sharded_db.shards.2]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard2"
[pools.simple_db]
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
[pools.simple_db.users.0]
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
database = "some_db"

View File

@@ -1,52 +0,0 @@
package pgcat
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq"
"testing"
)
func Test(t *testing.T) {
t.Cleanup(setup(t))
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
}
func namedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
stmt, err := db.Prepare("SELECT $1")
if err != nil {
t.Fatalf("could not prepare: %+v", err)
}
for i := 0; i < 100; i++ {
rows, err := stmt.Query(1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}
func unnamedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
for i := 0; i < 100; i++ {
// Under the hood QueryContext generates an unnamed parameterized prepared statement
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}

View File

@@ -1,81 +0,0 @@
package pgcat
import (
"context"
"database/sql"
_ "embed"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
"testing"
"time"
)
//go:embed pgcat.toml
var pgcatCfg string
var port = rand.Intn(32760-20000) + 20000
func setup(t *testing.T) func() {
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
if err != nil {
t.Fatalf("could not create temp file: %+v", err)
}
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
_, err = cfg.Write([]byte(pgcatCfg))
if err != nil {
t.Fatalf("could not write temp file: %+v", err)
}
commandPath := "../../target/debug/pgcat"
if os.Getenv("CARGO_TARGET_DIR") != "" {
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
}
cmd := exec.Command(commandPath, cfg.Name())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err = cmd.Run()
if err != nil {
t.Errorf("could not run pgcat: %+v", err)
}
}()
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancelFunc()
for {
select {
case <-deadline.Done():
break
case <-time.After(50 * time.Millisecond):
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
if err != nil {
continue
}
rows, err := db.QueryContext(deadline, "SHOW STATS")
if err != nil {
continue
}
_ = rows.Close()
_ = db.Close()
break
}
break
}
return func() {
err := cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatalf("could not interrupt pgcat: %+v", err)
}
err = os.Remove(cfg.Name())
if err != nil {
t.Fatalf("could not remove temp file: %+v", err)
}
}
}

View File

@@ -36,4 +36,4 @@ SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
SET SERVER ROLE TO 'replica';
-- Read load balancing
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;

View File

@@ -90,28 +90,4 @@ describe "Admin" do
expect(results["pool_mode"]).to eq("transaction")
end
end
describe "PAUSE" do
it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
admin_conn.async_exec("PAUSE")
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["1"])
admin_conn.async_exec("RESUME")
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
end
it "handles errors" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec("PAUSE foo").to_a }.to raise_error(PG::SystemError)
expect { admin_conn.async_exec("PAUSE foo,bar").to_a }.to raise_error(PG::SystemError)
end
end
end

View File

@@ -185,7 +185,7 @@ describe "Auth Query" do
},
}
}
}
}
context 'and with cleartext passwords set' do
it 'it uses local passwords' do

View File

@@ -33,18 +33,18 @@ module Helpers
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica.port.to_i, "replica"],
["localhost", primary.port.to_s, "primary"],
["localhost", replica.port.to_s, "replica"],
]
},
},
"users" => { "0" => user.merge(config_user) }
}
}
pgcat_cfg["general"]["port"] = pgcat.port.to_i
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg)
pgcat.start
pgcat.wait_until_ready(
pgcat.connection_string(
"sharded_db",
@@ -92,13 +92,13 @@ module Helpers
"0" => {
"database" => database,
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica.port.to_i, "replica"],
["localhost", primary.port.to_s, "primary"],
["localhost", replica.port.to_s, "replica"],
]
},
},
"users" => { "0" => user.merge(config_user) }
}
}
end
# Main proxy configs
pgcat_cfg["pools"] = {
@@ -109,7 +109,7 @@ module Helpers
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg.deep_merge(extra_conf))
pgcat.start
pgcat.wait_until_ready(pgcat.connection_string("sharded_db0", pg_user['username'], pg_user['password']))
OpenStruct.new.tap do |struct|

View File

@@ -7,24 +7,10 @@ class PgInstance
attr_reader :password
attr_reader :database_name
def self.mass_takedown(databases)
raise StandardError "block missing" unless block_given?
databases.each do |database|
database.toxiproxy.toxic(:limit_data, bytes: 1).toxics.each(&:save)
end
sleep 0.1
yield
ensure
databases.each do |database|
database.toxiproxy.toxics.each(&:destroy)
end
end
def initialize(port, username, password, database_name)
@original_port = port.to_i
@original_port = port
@toxiproxy_port = 10000 + port.to_i
@port = @toxiproxy_port.to_i
@port = @toxiproxy_port
@username = username
@password = password
@@ -62,9 +48,9 @@ class PgInstance
def take_down
if block_given?
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).apply { yield }
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
else
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).toxics.each(&:save)
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
end
end
@@ -103,6 +89,6 @@ class PgInstance
end
def count_select_1_plus_2
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query LIKE '%SELECT $1 + $2%'")[0]["sum"].to_i }
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
end
end

View File

@@ -34,13 +34,12 @@ module Helpers
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => true,
"query_parser_enabled" => true,
"query_parser_read_write_splitting" => true,
"automatic_sharding_key" => "data.id",
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_i, "primary"]] },
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_i, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_i, "primary"]] },
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
},
"users" => { "0" => user },
"plugins" => {
@@ -100,7 +99,7 @@ module Helpers
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"]
["localhost", primary.port.to_s, "primary"]
]
},
},
@@ -146,10 +145,10 @@ module Helpers
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica0.port.to_i, "replica"],
["localhost", replica1.port.to_i, "replica"],
["localhost", replica2.port.to_i, "replica"]
["localhost", primary.port.to_s, "primary"],
["localhost", replica0.port.to_s, "replica"],
["localhost", replica1.port.to_s, "replica"],
["localhost", replica2.port.to_s, "replica"]
]
},
},

View File

@@ -1,10 +1,8 @@
require 'pg'
require 'json'
require 'tempfile'
require 'toml'
require 'fileutils'
require 'securerandom'
class ConfigReloadFailed < StandardError; end
class PgcatProcess
attr_reader :port
attr_reader :pid
@@ -20,7 +18,7 @@ class PgcatProcess
end
def initialize(log_level)
@env = {}
@env = {"RUST_LOG" => log_level}
@port = rand(20000..32760)
@log_level = log_level
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
@@ -32,7 +30,7 @@ class PgcatProcess
'../../target/debug/pgcat'
end
@command = "#{command_path} #{@config_filename} --log-level #{@log_level}"
@command = "#{command_path} #{@config_filename}"
FileUtils.cp("../../pgcat.toml", @config_filename)
cfg = current_config
@@ -48,34 +46,22 @@ class PgcatProcess
def update_config(config_hash)
@original_config = current_config
Tempfile.create('json_out', '/tmp') do |f|
f.write(config_hash.to_json)
f.flush
`cat #{f.path} | yj -jt > #{@config_filename}`
end
output_to_write = TOML::Generator.new(config_hash).body
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*\]/, ',\1]')
File.write(@config_filename, output_to_write)
end
def current_config
JSON.parse(`cat #{@config_filename} | yj -tj`)
end
def raw_config_file
File.read(@config_filename)
loadable_string = File.read(@config_filename)
loadable_string = loadable_string.gsub(/,\s*(\d+)\s*,/, ', "\1",')
loadable_string = loadable_string.gsub(/,\s*(\d+)\s*\]/, ', "\1"]')
TOML.load(loadable_string)
end
def reload_config
conn = PG.connect(admin_connection_string)
conn.async_exec("RELOAD")
rescue PG::ConnectionBad => e
errors = logs.split("Reloading config").last
errors = errors.gsub(/\e\[([;\d]+)?m/, '') # Remove color codes
errors = errors.
split("\n").select{|line| line.include?("ERROR") }.
map { |line| line.split("pgcat::config: ").last }
raise ConfigReloadFailed, errors.join("\n")
ensure
conn&.close
`kill -s HUP #{@pid}`
sleep 0.5
end
def start
@@ -126,16 +112,10 @@ class PgcatProcess
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
end
def connection_string(pool_name, username, password = nil, parameters: {})
def connection_string(pool_name, username, password = nil)
cfg = current_config
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
connection_string = "postgresql://#{username}:#{password || user_obj["password"]}@0.0.0.0:#{@port}/#{pool_name}"
# Add the additional parameters to the connection string
parameter_string = parameters.map { |key, value| "#{key}=#{value}" }.join("&")
connection_string += "?#{parameter_string}" unless parameter_string.empty?
connection_string
"postgresql://#{username}:#{password || user_obj["password"]}@0.0.0.0:#{@port}/#{pool_name}"
end
def example_connection_string

View File

@@ -11,9 +11,9 @@ describe "Query Mirroing" do
before do
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
@@ -31,8 +31,7 @@ describe "Query Mirroing" do
runs.times { conn.async_exec("SELECT 1 + 2") }
sleep 0.5
expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs)
# Allow some slack in mirroring successes
expect(mirror_pg.count_select_1_plus_2).to be > ((runs - 5) * 3)
expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3)
end
context "when main server connection is closed" do
@@ -43,9 +42,9 @@ describe "Query Mirroing" do
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config

View File

@@ -221,7 +221,7 @@ describe "Miscellaneous" do
conn.close
end
it "Does not send RESET ALL unless necessary" do
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -229,7 +229,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -239,7 +239,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(10)
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
it "Resets server roles correctly" do
@@ -252,7 +252,7 @@ describe "Miscellaneous" do
end
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
end
end
end
context "transaction mode" do
@@ -273,7 +273,7 @@ describe "Miscellaneous" do
end
end
it "Does not send RESET ALL unless necessary" do
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -282,7 +282,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -292,32 +292,8 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(10)
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
it "Respects tracked parameters on startup" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user", parameters: { "application_name" => "my_pgcat_test" }))
expect(conn.async_exec("SHOW application_name")[0]["application_name"]).to eq("my_pgcat_test")
conn.close
end
it "Respect tracked parameter on set statemet" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET application_name to 'my_pgcat_test'")
expect(conn.async_exec("SHOW application_name")[0]["application_name"]).to eq("my_pgcat_test")
end
it "Ignore untracked parameter on set statemet" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
orignal_statement_timeout = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
conn.async_exec("SET statement_timeout to 1500")
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq(orignal_statement_timeout)
end
end
context "transaction mode with transactions" do
@@ -331,7 +307,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -341,7 +317,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
end
@@ -354,7 +330,8 @@ describe "Miscellaneous" do
conn.async_exec("SET statement_timeout TO 1000")
conn.close
expect(processes.primary.count_query("RESET ALL")).to eq(0)
puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
it "will not clean up prepared statements" do
@@ -364,7 +341,8 @@ describe "Miscellaneous" do
conn.close
expect(processes.primary.count_query("RESET ALL")).to eq(0)
puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
end
end
@@ -374,9 +352,10 @@ describe "Miscellaneous" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
puts(current_configs["general"]["idle_client_in_transaction_timeout"])
current_configs["general"]["idle_client_in_transaction_timeout"] = 0
processes.pgcat.update_config(current_configs) # with timeout 0
processes.pgcat.reload_config
end
@@ -394,9 +373,9 @@ describe "Miscellaneous" do
context "idle transaction timeout set to 500ms" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
current_configs["general"]["idle_client_in_transaction_timeout"] = 500
processes.pgcat.update_config(current_configs) # with timeout 500
processes.pgcat.reload_config
end
@@ -415,7 +394,7 @@ describe "Miscellaneous" do
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(1) # above 500ms
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
conn.async_exec("SELECT 1") # should be able to send another query
conn.close
end

View File

@@ -1,214 +1,29 @@
require_relative 'spec_helper'
describe 'Prepared statements' do
let(:pool_size) { 5 }
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", pool_size) }
let(:prepared_statements_cache_size) { 100 }
let(:server_round_robin) { false }
let(:processes) { Helpers::Pgcat.three_shard_setup('sharded_db', 5) }
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["server_round_robin"] = server_round_robin
new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = pool_size
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
end
context 'when trying prepared statements' do
it 'it allows unparameterized statements to succeed' do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
prepared_query = "SELECT 1"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
conn1.exec_prepared('statement1')
conn2.transaction do
# Claim server 1 with client 2
conn2.exec("SELECT 2")
# Client 1 now runs the prepared query, and it's automatically
# prepared on server 2
conn1.prepare('statement2', prepared_query)
conn1.exec_prepared('statement2')
# Client 2 now prepares the same query that was already
# prepared on server 1. And PgBouncer reuses that already
# prepared query for this different client.
conn2.prepare('statement3', prepared_query)
conn2.exec_prepared('statement3')
end
ensure
conn1.close if conn1
conn2.close if conn2
end
it 'it allows parameterized statements to succeed' do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
prepared_query = "SELECT $1"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
conn1.exec_prepared('statement1', [1])
conn2.transaction do
# Claim server 1 with client 2
conn2.exec("SELECT 2")
# Client 1 now runs the prepared query, and it's automatically
# prepared on server 2
conn1.prepare('statement2', prepared_query)
conn1.exec_prepared('statement2', [1])
# Client 2 now prepares the same query that was already
# prepared on server 1. And PgBouncer reuses that already
# prepared query for this different client.
conn2.prepare('statement3', prepared_query)
conn2.exec_prepared('statement3', [1])
end
ensure
conn1.close if conn1
conn2.close if conn2
end
end
context 'when trying large packets' do
it "works with large parse" do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
long_string = "1" * 4096 * 10
prepared_query = "SELECT '#{long_string}'"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
result = conn1.exec_prepared('statement1')
# assert result matches long_string
expect(result.getvalue(0, 0)).to eq(long_string)
ensure
conn1.close if conn1
end
it "works with large bind" do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
long_string = "1" * 4096 * 10
prepared_query = "SELECT $1::text"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
result = conn1.exec_prepared('statement1', [long_string])
# assert result matches long_string
expect(result.getvalue(0, 0)).to eq(long_string)
ensure
conn1.close if conn1
end
end
context 'when statement cache is smaller than set of unqiue statements' do
let(:prepared_statements_cache_size) { 1 }
let(:pool_size) { 1 }
it "evicts all but 1 statement from the server cache" do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
5.times do |i|
prepared_query = "SELECT '#{i}'"
conn.prepare("statement#{i}", prepared_query)
result = conn.exec_prepared("statement#{i}")
expect(result.getvalue(0, 0)).to eq(i.to_s)
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
end
context 'when statement cache is larger than set of unqiue statements' do
let(:pool_size) { 1 }
it "does not evict any of the statements from the cache" do
# cache size 5
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
5.times do |i|
prepared_query = "SELECT '#{i}'"
conn.prepare("statement#{i}", prepared_query)
result = conn.exec_prepared("statement#{i}")
expect(result.getvalue(0, 0)).to eq(i.to_s)
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(5)
end
end
context 'when preparing the same query' do
let(:prepared_statements_cache_size) { 5 }
let(:pool_size) { 5 }
it "reuses statement cache when there are different statement names on the same connection" do
context 'enabled' do
it 'will work over the same connection' do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
10.times do |i|
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
conn.describe_prepared(statement_name)
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
it "reuses statement cache when there are different statement names on different connections" do
10.times do |i|
it 'will work with new connections' do
10.times do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
statement_name = 'statement1'
conn.prepare('statement1', 'SELECT $1::int')
conn.exec_prepared('statement1', [1])
conn.describe_prepared('statement1')
end
# Check number of prepared statements (expected: 1)
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
end
context 'when reloading config' do
let(:pool_size) { 1 }
it "test_reload_config" do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
# prepare query
conn.prepare('statement1', 'SELECT 1')
conn.exec_prepared('statement1')
# Reload config which triggers pool recreation
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size + 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
# check that we're starting with no prepared statements on the server
conn_check = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
n_statements = conn_check.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(0)
# still able to run prepared query
conn.exec_prepared('statement1')
end
end
end

View File

@@ -7,11 +7,11 @@ describe "Sharding" do
before do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
# Setup the sharding data
3.times do |i|
conn.exec("SET SHARD TO '#{i}'")
conn.exec("DELETE FROM data WHERE id > 0") rescue nil
conn.exec("DELETE FROM data WHERE id > 0")
end
18.times do |i|
@@ -19,11 +19,10 @@ describe "Sharding" do
conn.exec("SET SHARDING KEY TO '#{i}'")
conn.exec("INSERT INTO data (id, value) VALUES (#{i}, 'value_#{i}')")
end
conn.close
end
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
@@ -49,148 +48,4 @@ describe "Sharding" do
end
end
end
describe "no_shard_specified_behavior config" do
context "when default shard number is invalid" do
it "prevents config reload" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"]["default_shard"] = "shard_99"
processes.pgcat.update_config(current_configs)
expect { processes.pgcat.reload_config }.to raise_error(ConfigReloadFailed, /Invalid shard 99/)
end
end
end
describe "comment-based routing" do
context "when no configs are set" do
it "routes queries with a shard_id comment to the default shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
10.times { conn.async_exec("/* shard_id: 2 */ SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([10, 0, 0])
end
it "does not honor no_shard_specified_behavior directives" do
end
end
[
["shard_id_regex", "/\\* the_shard_id: (\\d+) \\*/", "/* the_shard_id: 1 */"],
["sharding_key_regex", "/\\* the_sharding_key: (\\d+) \\*/", "/* the_sharding_key: 3 */"],
].each do |config_name, config_value, comment_to_use|
context "when #{config_name} config is set" do
let(:no_shard_specified_behavior) { nil }
before do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"][config_name] = config_value
if no_shard_specified_behavior
current_configs["pools"]["sharded_db"]["default_shard"] = no_shard_specified_behavior
else
current_configs["pools"]["sharded_db"].delete("default_shard")
end
processes.pgcat.update_config(current_configs)
processes.pgcat.reload_config
end
it "routes queries with a shard_id comment to the correct shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
context "when no_shard_specified_behavior config is set to random" do
let(:no_shard_specified_behavior) { "random" }
context "with no shard comment" do
it "sends queries to random shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
context "when no_shard_specified_behavior config is set to random_healthy" do
let(:no_shard_specified_behavior) { "random_healthy" }
context "with no shard comment" do
it "sends queries to random healthy shard" do
good_databases = [processes.all_databases[0], processes.all_databases[2]]
bad_database = processes.all_databases[1]
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
250.times { conn.async_exec("SELECT 99") }
bad_database.take_down do
250.times do
conn.async_exec("SELECT 99")
rescue PG::ConnectionBad => e
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
end
end
# Routes traffic away from bad shard
25.times { conn.async_exec("SELECT 1 + 2") }
expect(good_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true
expect(bad_database.count_select_1_plus_2).to eq(0)
# Routes traffic to the bad shard if the shard_id is specified
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
bad_database = processes.all_databases[1]
expect(bad_database.count_select_1_plus_2).to eq(25)
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
context "when no_shard_specified_behavior config is set to shard_x" do
let(:no_shard_specified_behavior) { "shard_2" }
context "with no shard comment" do
it "sends queries to the specified shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 0, 25])
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
end
end
end
end

View File

@@ -329,40 +329,6 @@ describe "Stats" do
admin_conn.close
connections.map(&:close)
end
context "when client has waited for a server" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "shows correct maxwait" do
threads = []
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW CLIENTS")
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
expect(non_waiting_clients.count).to eq(2)
non_waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
end
expect(waiting_clients.count).to eq(1)
waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
end
admin_conn.close
connections.map(&:close)
end
end
end

4
tests/rust/Cargo.lock generated
View File

@@ -1206,9 +1206,9 @@ dependencies = [
[[package]]
name = "webpki"
version = "0.22.2"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",

View File

@@ -16,14 +16,7 @@ async fn test_prepared_statements() {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
}
}
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
}
});

View File

@@ -17,21 +17,14 @@ cargo build --release
rm -rf "$deb_dir"
mkdir -p "$deb_dir/DEBIAN"
mkdir -p "$deb_dir/usr/bin"
mkdir -p "$deb_dir/etc/systemd/system"
mkdir -p "$deb_dir/etc"
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
chmod +x "$deb_dir/usr/bin/pgcat"
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
cp postinst "$deb_dir/DEBIAN/postinst"
cp postrm "$deb_dir/DEBIAN/postrm"
cp prerm "$deb_dir/DEBIAN/prerm"
chmod +x ${deb_dir}/DEBIAN/post*
chmod +x ${deb_dir}/DEBIAN/pre*
dpkg-deb \
--root-owner-group \