Compare commits

..

5 Commits

Author SHA1 Message Date
Mostafa
3fc9e5dec1 Merge branch 'main' of github.com:postgresml/pgcat into mostafa_fix_prepared_stmts 2024-09-03 18:11:32 -05:00
Mostafa
f7c5c0faf9 fix bind 2024-09-01 16:14:44 -05:00
Mostafa
982d03c374 fix syntax 2024-09-01 15:41:33 -05:00
Mostafa
686b7ca7c5 Fixes 2024-09-01 15:31:27 -05:00
Mostafa
7c55bf78fe Add failing tests 2024-09-01 14:39:05 -05:00
39 changed files with 757 additions and 1818 deletions

View File

@@ -26,7 +26,6 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server &
sleep 1
@@ -107,7 +106,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
pytest || exit 1
python3 tests/python/tests.py || exit 1
#
@@ -178,6 +177,3 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown
sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

View File

@@ -20,9 +20,9 @@ jobs:
version: v3.8.1
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python
# yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python
uses: actions/setup-python@v5.1.0
uses: actions/setup-python@v4.1.0
with:
python-version: 3.7
@@ -43,7 +43,7 @@ jobs:
run: ct lint --config ct.yaml
- name: Create kind cluster
uses: helm/kind-action@v1.10.0
uses: helm/kind-action@v1.7.0
if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install)

View File

@@ -32,7 +32,7 @@ jobs:
version: v3.13.0
- name: Run chart-releaser
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
with:
charts_dir: charts
config: cr.yaml

View File

@@ -1,9 +1,6 @@
name: pgcat package (deb)
on:
push:
tags:
- v*
workflow_dispatch:
inputs:
packageVersion:
@@ -19,14 +16,6 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Set package version
if: github.event_name == 'push' # For push event
run: |
TAG=${{ github.ref_name }}
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
- name: Set package version (manual dispatch)
if: github.event_name == 'workflow_dispatch' # For manual dispatch
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
@@ -50,10 +39,10 @@ jobs:
export ARCH=arm64
fi
bash utilities/deb.sh ${{ env.packageVersion }}
bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \
--lock \
--bucket apt.postgresml.org \
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs)

4
.gitignore vendored
View File

@@ -10,6 +10,4 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
**/__pycache__
.bundle
.venv

View File

@@ -36,11 +36,10 @@ Port at which prometheus exporter listens on.
### connect_timeout
```
path: general.connect_timeout
default: 1000 # milliseconds
default: 5000 # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
How long to wait before aborting a server connection (ms).
### idle_timeout
```
@@ -298,19 +297,6 @@ Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy connections
### checkout_failure_limit
```
path: pools.<pool_name>.checkout_failure_limit
default: 0 (disabled)
```
`Maximum number of checkout failures a client is allowed before it
gets disconnected. This is needed to prevent persistent client/server
imbalance in high availability setups where multiple PgCat instances are placed
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
in session mode
`
### default_role
```
path: pools.<pool_name>.default_role
@@ -322,45 +308,6 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.
### db_activity_based_routing
```
path: pools.<pool_name>.db_activity_based_routing
default: false
```
If enabled, PgCat will route queries to the primary if the queried table was recently written to.
Only relevant when `query_parser_enabled` *and* `query_parser_read_write_splitting` is enabled.
##### Considerations:
- *This feature is experimental and may not work as expected.*
- This feature only works when the same PgCat instance is used for both reads and writes to the database.
- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries.
- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions.
### db_activity_based_ms_init_delay
```
path: pools.<pool_name>.db_activity_based_ms_init_delay
default: 100
```
The delay in milliseconds before the first activity-based routing check is performed.
### db_activity_ttl
```
path: pools.<pool_name>.db_activity_ttl
default: 900
```
The time in seconds after which a DB is considered inactive when no queries/updates are performed to it.
### table_mutation_cache_ms_ttl
```
path: pools.<pool_name>.table_mutation_cache_ms_ttl
default: 50
```
The time in milliseconds after a write to a table that all queries to that table will be routed to the primary.
### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
@@ -515,18 +462,10 @@ path: pools.<pool_name>.users.<user_index>.pool_size
default: 9
```
Maximum number of server connections that can be established for this user.
Maximum number of server connections that can be established for this user
The maximum number of connection from a single Pgcat process to any database in the cluster
is the sum of pool_size across all users.
### min_pool_size
```
path: pools.<pool_name>.users.<user_index>.min_pool_size
default: 0
```
Minimum number of idle server connections to retain for this pool.
### statement_timeout
```
path: pools.<pool_name>.users.<user_index>.statement_timeout
@@ -536,16 +475,6 @@ default: 0
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
0 means it is disabled.
### connect_timeout
```
path: pools.<pool_name>.users.<user_index>.connect_timeout
default: <UNSET> # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
If unset, uses the `connect_timeout` defined globally.
## `pools.<pool_name>.shards.<shard_index>` Section
### servers
@@ -573,3 +502,4 @@ default: "shard0"
```
Database name (e.g. "postgres")

View File

@@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here:
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust.
We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`

298
Cargo.lock generated
View File

@@ -132,7 +132,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -143,7 +143,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -192,11 +192,12 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "bb8"
version = "0.8.6"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot",
"tokio",
@@ -229,12 +230,6 @@ version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "bytecount"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -247,37 +242,6 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "camino"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3"
dependencies = [
"serde",
]
[[package]]
name = "cargo-platform"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc"
dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]]
name = "cc"
version = "1.0.79"
@@ -337,7 +301,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -367,21 +331,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "crypto-common"
version = "0.1.6"
@@ -392,19 +341,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
@@ -467,15 +403,6 @@ dependencies = [
"libc",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "exitcode"
version = "1.1.2"
@@ -488,12 +415,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "fnv"
version = "1.0.7"
@@ -565,7 +486,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -625,12 +546,6 @@ version = "0.27.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
version = "0.4.6"
@@ -1004,21 +919,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mini-moka"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap",
"skeptic",
"smallvec",
"tagptr",
"triomphe",
]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
@@ -1093,9 +993,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.20.2"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "overload"
@@ -1134,7 +1034,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.3.0"
version = "1.2.0"
dependencies = [
"arc-swap",
"async-trait",
@@ -1156,7 +1056,6 @@ dependencies = [
"log",
"lru",
"md-5",
"mini-moka",
"nix",
"num_cpus",
"once_cell",
@@ -1171,7 +1070,6 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"serial_test",
"sha-1",
"sha2",
"socket2 0.4.9",
@@ -1217,7 +1115,7 @@ dependencies = [
"phf_shared",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1246,7 +1144,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1287,24 +1185,13 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.89"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.3.3",
"memchr",
"unicase",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@@ -1313,9 +1200,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.37"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0"
dependencies = [
"proc-macro2",
]
@@ -1494,24 +1381,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scc"
version = "2.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8"
dependencies = [
"sdd",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1528,39 +1397,24 @@ dependencies = [
"untrusted",
]
[[package]]
name = "sdd"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49c1eeaf4b6a87c7479688c6d52b9f1153cedd3c489300564f932b065c6eab95"
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
dependencies = [
"serde",
]
[[package]]
name = "serde"
version = "1.0.214"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5"
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.214"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766"
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1583,31 +1437,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serial_test"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d"
dependencies = [
"futures",
"log",
"once_cell",
"parking_lot",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]]
name = "sha-1"
version = "0.10.1"
@@ -1654,21 +1483,6 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "slab"
version = "0.4.8"
@@ -1712,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.52.0"
version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [
"log",
"sqlparser_derive",
@@ -1728,7 +1542,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1772,34 +1586,15 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.87"
version = "2.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tempfile"
version = "3.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
dependencies = [
"cfg-if",
"fastrand",
"redox_syscall",
"rustix",
"windows-sys",
]
[[package]]
name = "thiserror"
version = "1.0.43"
@@ -1817,7 +1612,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1884,7 +1679,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -1989,7 +1784,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
]
[[package]]
@@ -2044,12 +1839,6 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "triomphe"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
[[package]]
name = "trust-dns-proto"
version = "0.22.0"
@@ -2107,12 +1896,6 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicase"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@@ -2169,16 +1952,6 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.1"
@@ -2221,7 +1994,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
"wasm-bindgen-shared",
]
@@ -2243,7 +2016,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.26",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -2295,15 +2068,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.3.0"
version = "1.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "=0.8.6"
bb8 = "0.8.1"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.52", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"
@@ -55,10 +55,6 @@ tracing-subscriber = { version = "0.3.17", features = [
"std",
] }
lru = "0.12.0"
mini-moka = "0.10.3"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"
[dev-dependencies]
serial_test = "*"

View File

@@ -1,4 +1,4 @@
FROM rust:1.81.0-slim-bookworm AS builder
FROM rust:1.79.0-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential

View File

@@ -1,4 +1,4 @@
FROM cimg/rust:1.81.0
FROM cimg/rust:1.79.0
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \

View File

@@ -231,7 +231,7 @@ User.find_by_email("test@example.com")
```sql
-- Grab a bunch of users from shard 1
SET SHARD TO '1';
SELECT * FROM users LIMIT 10;
SELECT * FROM users LIMT 10;
-- Find by id
SET SHARDING KEY TO '1234';

View File

@@ -1,8 +1,8 @@
apiVersion: v2
name: pgcat
description: A Helam chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers:
- name: PostgresML
email: team@postgresml.org
appVersion: "1.3.0"
version: 0.2.5
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.0

View File

@@ -15,7 +15,6 @@ stringData:
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
server_tls = {{ .Values.configuration.general.server_tls }}
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
@@ -51,10 +50,6 @@ stringData:
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
db_activity_based_routing = {{ default false $pool.db_activity_based_routing }}
db_activity_based_ms_init_delay = {{ default 100 $pool.db_activity_based_ms_init_delay }}
db_activity_ttl = {{ default 900 $pool.db_activity_ttl }}
table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }}
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
{{- range $index, $user := $pool.users }}
@@ -63,21 +58,11 @@ stringData:
##
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
username = {{ $user.username | quote }}
{{- if $user.password }}
password = {{ $user.password | quote }}
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
{{- if $secret }}
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
password = {{ $password | quote }}
{{- end }}
{{- end }}
pool_size = {{ $user.pool_size }}
statement_timeout = {{ default 0 $user.statement_timeout }}
min_pool_size = {{ default 3 $user.min_pool_size }}
{{- if $user.server_lifetime }}
server_lifetime = {{ $user.server_lifetime }}
{{- end }}
statement_timeout = {{ $user.statement_timeout }}
min_pool_size = 3
server_lifetime = 60000
{{- if and $user.server_username $user.server_password }}
server_username = {{ $user.server_username | quote }}
server_password = {{ $user.server_password | quote }}

View File

@@ -175,9 +175,6 @@ configuration:
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
# Whether to use TLS for server connections or not.
server_tls: false
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
@@ -298,22 +295,6 @@ configuration:
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
# query_parser_read_write_splitting: true
# ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated.
# ## @param configuration.poolsPostgres.db_activity_based_routing
# db_activity_based_routing: false
# ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation.
# ## @param configuration.poolsPostgres.db_activity_based_ms_init_delay
# db_activity_based_ms_init_delay: 100
# ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries.
# ## @param configuration.poolsPostgres.db_activity_ttl
# db_activity_ttl: 900
# ## Table mutation cache TTL. How long to keep track of table mutations.
# ## @param configuration.poolsPostgres.table_mutation_cache_ttl
# table_mutation_cache_ttl: 50
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# ## load balancing of read queries. Otherwise, the primary will only be used for write
# ## queries. The primary can always be explicitly selected with our custom protocol.
@@ -334,9 +315,7 @@ configuration:
# ## Credentials for users that may connect to this cluster
# ## @param users [array]
# ## @param users[0].username Name of the env var (required)
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
# ## @param users[0].passwordSecret.name Name of the secret containing the password
# ## @param users[0].passwordSecret.key Key in the secret containing the password
# ## @param users[0].password Value for the env var (required)
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# users: []

View File

@@ -1,2 +1 @@
sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# default_shard = "shard_0"
# no_shard_specified_behavior = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?

View File

@@ -7,7 +7,3 @@ systemctl enable pgcat
if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat
fi
if [ -f /etc/pgcat.toml ]; then
systemctl start pgcat
fi

View File

@@ -1,4 +1,3 @@
use crate::config::AuthType;
use crate::errors::Error;
use crate::pool::ConnectionPool;
use crate::server::Server;
@@ -72,7 +71,6 @@ impl AuthPassthrough {
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
let auth_user = crate::config::User {
username: self.user.clone(),
auth_type: AuthType::MD5,
password: Some(self.password.clone()),
server_username: None,
server_password: None,

View File

@@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
};
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*;
use crate::messages::*;
use crate::plugins::PluginOutput;
@@ -465,8 +463,8 @@ where
.count()
== 1;
// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only {
// Kick any client that's not admin while we're in admin-only mode.
debug!(
"Rejecting non-admin connection to {} when in admin only mode",
pool_name
@@ -483,76 +481,72 @@ where
let process_id: i32 = rand::random();
let secret_key: i32 = rand::random();
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let mut prepared_statements_enabled = false;
// Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin {
let config = get_config();
// TODO: Add SASL support.
// Perform MD5 authentication.
match config.general.admin_auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
if password_hash != password_response {
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
warn!("{}", error);
wrong_password(&mut write, username).await?;
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
if password_hash != password_response {
let error =
Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(error);
}
}
return Err(error);
}
(false, generate_server_parameters_for_admin())
}
// Authenticate normal user.
@@ -579,143 +573,92 @@ where
// Obtain the hash to compare, we give preference to that written in cleartext in config
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
// when the pool was created. If there is no hash there, we try to fetch it one more time.
match pool.settings.user.auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
let mut hash = (*pool.auth_hash.read()).clone();
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let mut hash = (*pool.auth_hash.read()).clone();
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!(
"Password for {}, obtained. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone());
}
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!("Password for {}, obtained. Updating.", client_identifier);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
*pool_auth_hash = Some(fetched_hash.clone());
}
} else {
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
}
} else {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
client_identifier,
));
}
}
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
prepared_statements_enabled =
transaction_mode && pool.prepared_statement_cache.is_some();
@@ -859,8 +802,6 @@ where
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut checkout_failure_count: u64 = 0;
self.stats.register(self.stats.clone());
// Result returned by one of the plugins.
@@ -883,7 +824,6 @@ where
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
@@ -1110,25 +1050,7 @@ where
query_router.role(),
err
);
checkout_failure_count += 1;
if let Some(limit) = pool.settings.checkout_failure_limit {
if checkout_failure_count >= limit {
error!(
"Checkout failure limit reached ({} / {}) - disconnecting client",
checkout_failure_count, limit
);
error_response_terminal(
&mut self.write,
&format!(
"checkout failure limit reached ({} / {})",
checkout_failure_count, limit
),
)
.await?;
self.stats.disconnect();
return Ok(());
}
}
continue;
}
};
@@ -1807,14 +1729,13 @@ where
/// and also the pool's statement cache. Add it to extended protocol data.
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
if !self.prepared_statements_enabled {
let client_given_name = Parse::get_name(&message)?;
if !self.prepared_statements_enabled || client_given_name.is_empty() {
debug!("Anonymous parse message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_parse(message, None));
return Ok(());
}
let client_given_name = Parse::get_name(&message)?;
let parse: Parse = (&message).try_into()?;
// Compute the hash of the parse statement
@@ -1852,15 +1773,14 @@ where
/// saved in the client cache.
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
if !self.prepared_statements_enabled {
let client_given_name = Bind::get_name(&message)?;
if !self.prepared_statements_enabled || client_given_name.is_empty() {
debug!("Anonymous bind message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_bind(message, None));
return Ok(());
}
let client_given_name = Bind::get_name(&message)?;
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let message = Bind::rename(message, &rewritten_parse.name)?;
@@ -1912,7 +1832,8 @@ where
}
let describe: Describe = (&message).try_into()?;
if describe.target == 'P' {
let client_given_name = describe.statement_name.clone();
if describe.target == 'P' || client_given_name.is_empty() {
debug!("Portal describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
@@ -1920,8 +1841,6 @@ where
return Ok(());
}
let client_given_name = describe.statement_name.clone();
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let describe = describe.rename(&rewritten_parse.name);

View File

@@ -208,9 +208,6 @@ impl Address {
pub struct User {
pub username: String,
pub password: Option<String>,
#[serde(default = "User::default_auth_type")]
pub auth_type: AuthType,
pub server_username: Option<String>,
pub server_password: Option<String>,
pub pool_size: u32,
@@ -228,7 +225,6 @@ impl Default for User {
User {
username: String::from("postgres"),
password: None,
auth_type: AuthType::MD5,
server_username: None,
server_password: None,
pool_size: 15,
@@ -243,10 +239,6 @@ impl Default for User {
}
impl User {
pub fn default_auth_type() -> AuthType {
AuthType::MD5
}
fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size {
@@ -342,9 +334,6 @@ pub struct General {
pub admin_username: String,
pub admin_password: String,
#[serde(default = "General::default_admin_auth_type")]
pub admin_auth_type: AuthType,
#[serde(default = "General::default_validate_config")]
pub validate_config: bool,
@@ -359,10 +348,6 @@ impl General {
"0.0.0.0".into()
}
pub fn default_admin_auth_type() -> AuthType {
AuthType::MD5
}
pub fn default_port() -> u16 {
5432
}
@@ -471,7 +456,6 @@ impl Default for General {
verify_server_certificate: false,
admin_username: String::from("admin"),
admin_password: String::from("admin"),
admin_auth_type: AuthType::MD5,
validate_config: true,
auth_query: None,
auth_query_user: None,
@@ -492,15 +476,6 @@ pub enum PoolMode {
Session,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum AuthType {
#[serde(alias = "trust", alias = "Trust")]
Trust,
#[serde(alias = "md5", alias = "MD5")]
MD5,
}
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -558,14 +533,6 @@ pub struct Pool {
/// Close idle connections that have been opened for longer than this.
pub idle_timeout: Option<u64>,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
/// Close server connections that have been opened for longer than this.
/// Only applied to idle connections. If the connection is actively used for
/// longer than this period, the pool will not interrupt it.
@@ -597,19 +564,6 @@ pub struct Pool {
#[serde(default = "Pool::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
// Support for query routing based on database activity
#[serde(default = "Pool::default_db_activity_based_routing")]
pub db_activity_based_routing: bool,
#[serde(default = "Pool::default_db_activity_init_delay")]
pub db_activity_init_delay: u64,
#[serde(default = "Pool::default_db_activity_ttl")]
pub db_activity_ttl: u64,
#[serde(default = "Pool::default_table_mutation_cache_ms_ttl")]
pub table_mutation_cache_ms_ttl: u64,
pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
@@ -663,25 +617,6 @@ impl Pool {
0
}
pub fn default_db_activity_based_routing() -> bool {
false
}
pub fn default_db_activity_init_delay() -> u64 {
// 100 milliseconds
100
}
pub fn default_db_activity_ttl() -> u64 {
// 15 minutes
15 * 60
}
pub fn default_table_mutation_cache_ms_ttl() -> u64 {
// 50 milliseconds
50
}
pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
@@ -764,23 +699,6 @@ impl Pool {
user.validate()?;
}
if self.db_activity_based_routing {
if self.db_activity_init_delay == 0 {
error!("db_activity_init_delay must be greater than 0");
return Err(Error::BadConfig);
}
if self.table_mutation_cache_ms_ttl == 0 {
error!("table_mutation_cache_ms_ttl must be greater than 0");
return Err(Error::BadConfig);
}
if self.db_activity_ttl == 0 {
error!("db_activity_ttl must be greater than 0");
return Err(Error::BadConfig);
}
}
Ok(())
}
}
@@ -790,7 +708,6 @@ impl Default for Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
checkout_failure_limit: None,
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
@@ -811,10 +728,6 @@ impl Default for Pool {
cleanup_server_connections: true,
log_client_parameter_status_changes: false,
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
db_activity_based_routing: Self::default_db_activity_based_routing(),
db_activity_init_delay: Self::default_db_activity_init_delay(),
db_activity_ttl: Self::default_db_activity_ttl(),
table_mutation_cache_ms_ttl: Self::default_table_mutation_cache_ms_ttl(),
plugins: None,
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
@@ -1307,17 +1220,6 @@ impl Config {
None => self.general.idle_timeout,
};
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
match pool_config.checkout_failure_limit {
Some(checkout_failure_limit) => {
info!(
"[pool: {}] Checkout failure limit: {}",
pool_name, checkout_failure_limit
);
}
None => {
info!("[pool: {}] Checkout failure limit: not set", pool_name);
}
};
info!(
"[pool: {}] Sharding function: {}",
pool_name,
@@ -1362,22 +1264,6 @@ impl Config {
"[pool: {}] Cleanup server connections: {}",
pool_name, pool_config.cleanup_server_connections
);
info!(
"[pool: {}] DB activity based routing: {}",
pool_name, pool_config.db_activity_based_routing
);
info!(
"[pool: {}] DB activity init delay: {}",
pool_name, pool_config.db_activity_init_delay
);
info!(
"[pool: {}] DB activity TTL: {}",
pool_name, pool_config.db_activity_ttl
);
info!(
"[pool: {}] Table mutation cache TTL: {}",
pool_name, pool_config.table_mutation_cache_ms_ttl
);
info!(
"[pool: {}] Log client parameter status changes: {}",
pool_name, pool_config.log_client_parameter_status_changes

View File

@@ -3,7 +3,7 @@ use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Initialize a default filter, and then override the builtin default "warning" with our
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());

View File

@@ -821,10 +821,10 @@ impl ExtendedProtocolData {
pub struct Parse {
code: char,
#[allow(dead_code)]
len: i32,
len: u32,
pub name: String,
query: String,
num_params: i16,
num_params: u16,
param_types: Vec<i32>,
}
@@ -834,12 +834,11 @@ impl TryFrom<&BytesMut> for Parse {
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let len = cursor.get_u32();
let name = cursor.read_string()?;
let query = cursor.read_string()?;
let num_params = cursor.get_i16();
let num_params = cursor.get_u16();
let mut param_types = Vec::new();
for _ in 0..num_params {
param_types.push(cursor.get_i32());
}
@@ -875,10 +874,10 @@ impl TryFrom<Parse> for BytesMut {
+ 4 * parse.num_params as usize;
bytes.put_u8(parse.code as u8);
bytes.put_i32(len as i32);
bytes.put_u32(len as u32);
bytes.put_slice(name);
bytes.put_slice(query);
bytes.put_i16(parse.num_params);
bytes.put_u16(parse.num_params);
for param in parse.param_types {
bytes.put_i32(param);
}
@@ -945,14 +944,14 @@ impl Parse {
pub struct Bind {
code: char,
#[allow(dead_code)]
len: i64,
len: u64,
portal: String,
pub prepared_statement: String,
num_param_format_codes: i16,
num_param_format_codes: u16,
param_format_codes: Vec<i16>,
num_param_values: i16,
num_param_values: u16,
param_values: Vec<(i32, BytesMut)>,
num_result_column_format_codes: i16,
num_result_column_format_codes: u16,
result_columns_format_codes: Vec<i16>,
}
@@ -962,17 +961,17 @@ impl TryFrom<&BytesMut> for Bind {
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let len = cursor.get_u32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
let num_param_format_codes = cursor.get_i16();
let num_param_format_codes = cursor.get_u16();
let mut param_format_codes = Vec::new();
for _ in 0..num_param_format_codes {
param_format_codes.push(cursor.get_i16());
}
let num_param_values = cursor.get_i16();
let num_param_values = cursor.get_u16();
let mut param_values = Vec::new();
for _ in 0..num_param_values {
@@ -994,7 +993,7 @@ impl TryFrom<&BytesMut> for Bind {
}
}
let num_result_column_format_codes = cursor.get_i16();
let num_result_column_format_codes = cursor.get_u16();
let mut result_columns_format_codes = Vec::new();
for _ in 0..num_result_column_format_codes {
@@ -1003,7 +1002,7 @@ impl TryFrom<&BytesMut> for Bind {
Ok(Bind {
code,
len: len as i64,
len: len as u64,
portal,
prepared_statement,
num_param_format_codes,
@@ -1042,19 +1041,19 @@ impl TryFrom<Bind> for BytesMut {
len += 2 * bind.num_result_column_format_codes as usize;
bytes.put_u8(bind.code as u8);
bytes.put_i32(len as i32);
bytes.put_u32(len as u32);
bytes.put_slice(portal);
bytes.put_slice(prepared_statement);
bytes.put_i16(bind.num_param_format_codes);
bytes.put_u16(bind.num_param_format_codes);
for param_format_code in bind.param_format_codes {
bytes.put_i16(param_format_code);
}
bytes.put_i16(bind.num_param_values);
bytes.put_u16(bind.num_param_values);
for (param_len, param) in bind.param_values {
bytes.put_i32(param_len);
bytes.put_slice(&param);
}
bytes.put_i16(bind.num_result_column_format_codes);
bytes.put_u16(bind.num_result_column_format_codes);
for result_column_format_code in bind.result_columns_format_codes {
bytes.put_i16(result_column_format_code);
}
@@ -1068,7 +1067,7 @@ impl Bind {
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
let mut cursor = Cursor::new(buf);
// Skip the code and length
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
cursor.advance(mem::size_of::<u8>() + mem::size_of::<u32>());
cursor.read_string()?;
cursor.read_string()
}
@@ -1078,17 +1077,17 @@ impl Bind {
let mut cursor = Cursor::new(&buf);
// Read basic data from the cursor
let code = cursor.get_u8();
let current_len = cursor.get_i32();
let current_len = cursor.get_u32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
// Calculate new length
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
let new_len = current_len + new_name.len() as u32 - prepared_statement.len() as u32;
// Begin building the response buffer
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
response_buf.put_u8(code);
response_buf.put_i32(new_len);
response_buf.put_u32(new_len);
// Put the portal and new name into the buffer
// Note: panic if the provided string contains null byte
@@ -1112,7 +1111,7 @@ pub struct Describe {
code: char,
#[allow(dead_code)]
len: i32,
len: u32,
pub target: char,
pub statement_name: String,
}
@@ -1123,7 +1122,7 @@ impl TryFrom<&BytesMut> for Describe {
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
let mut cursor = Cursor::new(bytes);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let len = cursor.get_u32();
let target = cursor.get_u8() as char;
let statement_name = cursor.read_string()?;
@@ -1146,7 +1145,7 @@ impl TryFrom<Describe> for BytesMut {
let len = 4 + 1 + statement_name.len();
bytes.put_u8(describe.code as u8);
bytes.put_i32(len as i32);
bytes.put_u32(len as u32);
bytes.put_u8(describe.target as u8);
bytes.put_slice(statement_name);

View File

@@ -152,14 +152,6 @@ pub struct PoolSettings {
/// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
// Number of shards.
pub shards: usize,
@@ -182,18 +174,6 @@ pub struct PoolSettings {
// Read from the primary as well or not.
pub primary_reads_enabled: bool,
// Automatic primary/replica selection based on recent activity.
pub db_activity_based_routing: bool,
// DB activity init delay
pub db_activity_init_delay: u64,
// DB activity TTL
pub db_activity_ttl: u64,
// Table mutation cache TTL
pub table_mutation_cache_ms_ttl: u64,
// Sharding function.
pub sharding_function: ShardingFunction,
@@ -235,7 +215,6 @@ impl Default for PoolSettings {
PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 1,
user: User::default(),
db: String::default(),
@@ -244,10 +223,6 @@ impl Default for PoolSettings {
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: true,
db_activity_based_routing: false,
db_activity_init_delay: 100,
db_activity_ttl: 15 * 60,
table_mutation_cache_ms_ttl: 50,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
healthcheck_delay: General::default_healthcheck_delay(),
@@ -546,7 +521,6 @@ impl ConnectionPool {
None => pool_config.pool_mode,
},
load_balancing_mode: pool_config.load_balancing_mode,
checkout_failure_limit: pool_config.checkout_failure_limit,
// shards: pool_config.shards.clone(),
shards: shard_ids.len(),
user: user.clone(),
@@ -563,10 +537,6 @@ impl ConnectionPool {
.query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
db_activity_based_routing: pool_config.db_activity_based_routing,
db_activity_init_delay: pool_config.db_activity_init_delay,
db_activity_ttl: pool_config.db_activity_ttl,
table_mutation_cache_ms_ttl: pool_config.table_mutation_cache_ms_ttl,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
healthcheck_delay: config.general.healthcheck_delay,
healthcheck_timeout: config.general.healthcheck_timeout,

View File

@@ -200,17 +200,18 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
let formatted_labels = self
.labels
.iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"{name}{{{formatted_labels}}} {value}",
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels,
value = self.value
)
@@ -246,7 +247,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -263,8 +264,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -288,15 +288,6 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels)
}
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
}
async fn prometheus_stats(
@@ -309,7 +300,6 @@ async fn prometheus_stats(
push_pool_stats(&mut lines);
push_server_stats(&mut lines);
push_database_stats(&mut lines);
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
Response::builder()
.header("content-type", "text/plain; version=0.0.4")
@@ -323,7 +313,6 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -333,10 +322,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value)
{
grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -344,53 +330,33 @@ fn push_address_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone();
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);
let metrics = vec![
("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections),
@@ -399,10 +365,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value)
{
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -410,14 +373,6 @@ fn push_database_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -450,7 +405,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -473,10 +428,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value)
{
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -485,14 +437,6 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
pub async fn start_metric_server(http_addr: SocketAddr) {

View File

@@ -2,7 +2,6 @@
/// or implied query characteristics.
use bytes::{Buf, BytesMut};
use log::{debug, error};
use mini_moka::sync::Cache;
use once_cell::sync::OnceCell;
use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
@@ -12,7 +11,6 @@ use sqlparser::ast::{
};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::sync::OnceLock;
use crate::config::Role;
use crate::errors::Error;
@@ -23,7 +21,6 @@ use crate::sharding::Sharder;
use std::collections::BTreeSet;
use std::io::Cursor;
use std::time::Duration;
use std::{cmp, mem};
/// Regexes used to parse custom commands.
@@ -69,18 +66,6 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
#[derive(Debug, Clone, PartialEq)]
enum DatabaseActivityState {
Active,
Initializing,
}
// A moka cache for the databases
// the key is the database name and the value is the database activity state
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
// A moka cache for the tables, the key is the db_table.
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
/// The query router.
pub struct QueryRouter {
/// Which shard we should be talking to right now.
@@ -102,12 +87,6 @@ pub struct QueryRouter {
placeholders: Vec<i16>,
}
struct ExtractedExprsAndTables<'a> {
exprs: Vec<Expr>,
table_names: Vec<Vec<Ident>>,
assignments_opt: Option<&'a Vec<Assignment>>,
}
impl QueryRouter {
/// One-time initialization of regexes
/// that parse our custom SQL protocol.
@@ -407,53 +386,6 @@ impl QueryRouter {
}
}
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
_ => false,
}
}
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
DATABASE_ACTIVITY_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
.build()
})
.clone()
}
/// Check database activity state and reset it if necessary
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
let cache = self.database_activity_cache();
// Exists in cache
if cache.contains_key(db) {
return cache.get(db).unwrap();
}
// Not in cache
debug!("Adding database to cache: {}", db);
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
// Set a timer to update the cache
let db = db.clone();
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
cache.insert(db, DatabaseActivityState::Active);
});
DatabaseActivityState::Initializing
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting {
@@ -468,23 +400,9 @@ impl QueryRouter {
return Err(Error::QueryRouterParserError("empty query".into()));
}
let mut primary_set_based_on_activity = false;
let mut visited_write_statement = false;
let mut prev_inferred_shard = None;
if self.pool_settings.db_activity_based_routing {
let db = self.pool_settings.db.clone();
let state = self.database_activity_state(&db);
debug!("Database activity state: {:?}", state);
if let DatabaseActivityState::Initializing = state {
debug!("Database is initializing, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
}
}
for q in ast {
match q {
// All transactions go to the primary, probably a write.
@@ -495,22 +413,6 @@ impl QueryRouter {
// Likely a read-only query
Query(query) => {
if primary_set_based_on_activity {
// If we already set the role based on activity, we don't need to do it again
continue;
}
if self.pool_settings.db_activity_based_routing {
// Check if the tables in the query have been written to recently
if self.query_handles_tables_in_mutation_cache(query) {
debug!("Query handles tables in mutation cache, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
continue;
}
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: if we have multiple queries in the same message,
@@ -526,9 +428,8 @@ impl QueryRouter {
};
let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
if has_locks || has_mutation {
if has_locks {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
@@ -541,13 +442,6 @@ impl QueryRouter {
// Likely a write
_ => {
debug!("Write statement found, going to primary");
if self.pool_settings.db_activity_based_routing {
// add all of the query tables to the mutation cache
self.update_mutation_cache_on_write(q);
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: similar to the above, if we have multiple queries in the
@@ -590,69 +484,62 @@ impl QueryRouter {
Ok(())
}
fn table_mutations_cache(&self) -> Cache<String, bool> {
TABLE_MUTATIONS_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_live(Duration::from_millis(
self.pool_settings.table_mutation_cache_ms_ttl,
))
.build()
})
.clone()
}
fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
let table_mutations_cache = self.table_mutations_cache();
debug!("Checking if query handles tables in mutation cache");
debug!("Table mutations cache: {:?}", table_mutations_cache);
for tables in self.table_names(query) {
for table in tables {
if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
return true;
}
}
}
false
}
fn extract_exprs_and_table_names<'a>(
&'a self,
q: &'a Statement,
) -> Option<ExtractedExprsAndTables<'a>> {
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
let mut exprs = Vec::new();
// Collect all table names from the query.
let mut table_names = Vec::new();
let mut assignments_opt = None;
match q {
Insert(i) => {
Insert {
or,
into: _,
table_name,
columns,
overwrite: _,
source,
partitioned,
after_columns,
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(i.or.is_none());
assert!(i.partitioned.is_none());
assert!(i.after_columns.is_empty());
assert!(or.is_none());
assert!(partitioned.is_none());
assert!(after_columns.is_empty());
Self::process_table(&i.table_name, &mut table_names);
if let Some(source) = &i.source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
}
Delete(d) => {
if let Some(expr) = &d.selection {
Delete {
tables,
from,
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone());
}
// Multi-tables delete are not supported in postgres.
assert!(d.tables.is_empty());
// Multi tables delete are not supported in postgres.
assert!(tables.is_empty());
if let Some(using_tbl_with_join) = &d.using {
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
if let Some(using_tbl_with_join) = using {
Self::process_tables_with_join(
using_tbl_with_join,
&mut exprs,
&mut table_names,
);
}
Self::process_selection(&d.selection, &mut exprs);
Self::process_selection(selection, &mut exprs);
}
Update {
table,
@@ -666,55 +553,14 @@ impl QueryRouter {
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
}
Self::process_selection(selection, &mut exprs);
assignments_opt = Some(assignments);
}
_ => return None,
};
Some(ExtractedExprsAndTables {
exprs,
table_names,
assignments_opt,
})
}
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
let exprs = extracted.exprs;
let table_names = extracted.table_names;
let assignments_opt = extracted.assignments_opt;
if let Some(assignments) = assignments_opt {
self.assignment_parser(assignments)?;
}
Ok(self.infer_shard_from_exprs(exprs, table_names))
} else {
Ok(None)
}
}
fn update_mutation_cache_on_write(&self, q: &Statement) {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
debug!("Updating mutation cache on write");
let table_names = extracted.table_names;
debug!("Table names in mutation query: {:?}", table_names);
let table_mutations_cache = self.table_mutations_cache();
for tables in table_names {
for table in tables {
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
}
_ => {
return Ok(None);
}
}
}
};
// combines the database name and table name into a single string
// to be used as the key in the table mutation cache
// e.g. "mydb.mytable"
fn table_mutation_cache_key(&self, table: Ident) -> String {
format!("{}.{}", self.pool_settings.db, table.value)
Ok(self.infer_shard_from_exprs(exprs, table_names))
}
fn process_query(
@@ -963,13 +809,7 @@ impl QueryRouter {
for a in assignments {
if sharding_key[0].value == "*"
&& sharding_key[1].value
== a.target
.to_string()
.split('.')
.last()
.unwrap()
.to_lowercase()
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
{
return Err(Error::QueryRouterParserError(
"Sharding key cannot be updated.".into(),
@@ -1102,18 +942,6 @@ impl QueryRouter {
self.infer_shard_from_exprs(exprs, table_names)
}
/// get table names from query
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
let mut exprs = Vec::new();
let mut table_names = Vec::new();
Self::process_query(query, &mut exprs, &mut table_names, &None);
debug!("Table names in query: {:?}", table_names);
table_names
}
fn infer_shard_from_exprs(
&mut self,
exprs: Vec<Expr>,
@@ -1220,11 +1048,6 @@ impl QueryRouter {
self.active_shard
}
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
@@ -1281,7 +1104,6 @@ mod test {
use crate::messages::simple_query;
use crate::sharding::ShardingFunction;
use bytes::BufMut;
use serial_test::serial;
#[test]
fn test_defaults() {
@@ -1291,26 +1113,6 @@ mod test {
assert_eq!(qr.role(), None);
}
#[test]
fn test_split_cte_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
let query = simple_query(
"WITH t AS (
SELECT id FROM users WHERE name ILIKE '%ja%'
)
UPDATE user_languages
SET settings = '{}'
FROM t WHERE t.id = user_id;",
);
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[test]
fn test_infer_replica() {
QueryRouter::setup();
@@ -1617,7 +1419,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
@@ -1638,10 +1439,6 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
let mut qr = QueryRouter::new();
@@ -1700,7 +1497,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: Some(10),
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
@@ -1721,10 +1517,6 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
@@ -2140,150 +1932,4 @@ mod test {
assert_eq!(res, Ok(PluginOutput::Allow));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_initializing_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Initially, the database activity should be in the "Initializing" state
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
assert_eq!(state, DatabaseActivityState::Initializing);
// Check that the router chooses the primary role due to "Initializing" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_active_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
let db_name = qr.pool_settings.db.clone();
let cache = qr.database_activity_cache();
cache.insert(db_name.clone(), DatabaseActivityState::Active);
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Check that the router can choose a replica role when in "Active" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), None); // Default should allow replica due to active state
}
#[tokio::test]
#[serial]
async fn test_table_mutation_cache_on_write() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
let ast = qr.parse(&query).unwrap();
// Simulate the mutation query which should populate the mutation cache
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
let cache = qr.table_mutations_cache();
// Ensure the table mutation cache contains the table with recent write
assert!(cache.contains_key(&table_cache_key));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_multi_query() {
use super::*;
use crate::messages::simple_query;
use tokio::time::Duration;
QueryRouter::setup();
let mut qr = QueryRouter::new();
// Configure the pool settings for db_activity_based_routing
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.db = "test_db_activity_routing".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
// First query when database is initializing
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
// Should route to primary because database is initializing
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the initialization delay to pass
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.db_activity_init_delay * 2,
))
.await;
// Next query after database is active
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because database is active and no recent mutations
assert_eq!(qr.role(), None);
// Simulate a write query to update the mutation cache
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because it's a write operation
assert_eq!(qr.role(), Some(Role::Primary));
// Immediately run a read query on the same table
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because the table was recently mutated
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the mutation cache TTL to expire
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
))
.await;
// Run the read query again after cache expiration
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because mutation cache has expired
assert_eq!(qr.role(), None);
}
}

View File

@@ -698,7 +698,6 @@ impl Server {
))
}
};
trace!("Error: {}", error_code);
match error_code {
@@ -1013,6 +1012,12 @@ impl Server {
// which can leak between clients. This is a best effort to block bad clients
// from poisoning a transaction-mode pool by setting inappropriate session variables
match command.as_str() {
"DISCARD ALL" => {
self.clear_prepared_statement_cache();
}
"DEALLOCATE ALL" => {
self.clear_prepared_statement_cache();
}
"SET" => {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
@@ -1132,6 +1137,12 @@ impl Server {
has_it
}
fn clear_prepared_statement_cache(&mut self) {
if let Some(cache) = &mut self.prepared_statement_cache {
cache.clear();
}
}
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,

View File

@@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running"
echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again"
echo "==================================="

View File

@@ -1,4 +1,4 @@
FROM rust:1.81.0-slim-bookworm
FROM rust:bullseye
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h

View File

@@ -1,3 +1,2 @@
pytest
psycopg2==2.9.3
psutil==5.9.1
psutil==5.9.1

View File

@@ -1,71 +0,0 @@
import utils
import signal
class TestTrustAuth:
@classmethod
def setup_method(cls):
config= """
[general]
host = "0.0.0.0"
port = 6432
admin_username = "admin_user"
admin_password = ""
admin_auth_type = "trust"
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
auth_type = "trust"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.sharded_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
]
database = "shard0"
"""
utils.pgcat_generic_start(config)
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_admin_trust_auth(self):
conn, cur = utils.connect_db_trust(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_normal_trust_auth(self):
conn, cur = utils.connect_db_trust(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
class TestMD5Auth:
@classmethod
def setup_method(cls):
utils.pgcat_start()
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_normal_db_access(self):
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access(self):
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)

View File

@@ -1,12 +1,84 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal
import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_shutdown_logic():
@@ -14,17 +86,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and send query (not in transaction)
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
cur.execute("COMMIT;")
# Send sigint to pgcat
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions
@@ -36,18 +108,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -66,24 +138,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -93,18 +165,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -122,30 +194,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
start = time.perf_counter()
try:
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("SELECT 1;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
@@ -155,49 +227,49 @@ def test_shutdown_logic():
else:
raise Exception("Able connect to database during shutdown")
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
conn, cur = utils.connect_db(admin=True)
conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = utils.connect_db(admin=True)
admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
@@ -205,24 +277,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -236,7 +308,12 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

View File

@@ -1,110 +0,0 @@
import os
import signal
import time
from typing import Tuple
import tempfile
import psutil
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def _pgcat_start(config_path: str):
pg_cat_send_signal(signal.SIGTERM)
os.system(f"./target/debug/pgcat {config_path} &")
time.sleep(2)
def pgcat_start():
_pgcat_start(config_path='.circleci/pgcat.toml')
def pgcat_generic_start(config: str):
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'w') as f:
f.write(config)
_pgcat_start(config_path=tmp.name)
def glauth_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.name() == "glauth":
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep glauth'):
raise Exception("glauth not closed after SIGTERM")
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def connect_db_trust(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
db = "pgcat"
else:
user = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()

View File

@@ -1,33 +1,22 @@
GEM
remote: https://rubygems.org/
specs:
activemodel (7.1.4)
activesupport (= 7.1.4)
activerecord (7.1.4)
activemodel (= 7.1.4)
activesupport (= 7.1.4)
timeout (>= 0.4.0)
activesupport (7.1.4)
base64
bigdecimal
activemodel (7.0.4.1)
activesupport (= 7.0.4.1)
activerecord (7.0.4.1)
activemodel (= 7.0.4.1)
activesupport (= 7.0.4.1)
activesupport (7.0.4.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2)
minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0)
ast (2.4.2)
base64 (0.2.0)
bigdecimal (3.1.8)
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
concurrent-ruby (1.1.10)
diff-lcs (1.5.0)
drb (2.2.1)
i18n (1.14.5)
i18n (1.12.0)
concurrent-ruby (~> 1.0)
minitest (5.25.1)
mutex_m (0.2.0)
minitest (5.17.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
@@ -35,8 +24,7 @@ GEM
pg (1.3.2)
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.3.6)
strscan
rexml (3.2.5)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
@@ -62,12 +50,10 @@ GEM
rubocop-ast (1.17.0)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
strscan (3.1.0)
timeout (0.4.1)
toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1)
tzinfo (2.0.6)
tzinfo (2.0.5)
concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)

View File

@@ -0,0 +1,145 @@
class PostgresMessage
# Base class for common functionality
def encode_string(str)
"#{str}\0" # Encode a string with a null terminator
end
def encode_int16(value)
[value].pack('n') # Encode an Int16
end
def encode_int32(value)
[value].pack('N') # Encode an Int32
end
def message_prefix(type, length)
"#{type}#{encode_int32(length)}" # Message type and length prefix
end
end
class SimpleQueryMessage < PostgresMessage
attr_accessor :query
def initialize(query = "")
@query = query
end
def to_bytes
query_bytes = encode_string(@query)
length = 4 + query_bytes.size # Length includes 4 bytes for length itself
message_prefix('Q', length) + query_bytes
end
end
class ParseMessage < PostgresMessage
attr_accessor :statement_name, :query, :parameter_types
def initialize(statement_name = "", query = "", parameter_types = [])
@statement_name = statement_name
@query = query
@parameter_types = parameter_types
end
def to_bytes
statement_name_bytes = encode_string(@statement_name)
query_bytes = encode_string(@query)
parameter_types_bytes = @parameter_types.pack('N*')
length = 4 + statement_name_bytes.size + query_bytes.size + 2 + parameter_types_bytes.size
message_prefix('P', length) + statement_name_bytes + query_bytes + encode_int16(@parameter_types.size) + parameter_types_bytes
end
end
class BindMessage < PostgresMessage
attr_accessor :portal_name, :statement_name, :parameter_format_codes, :parameters, :result_column_format_codes
def initialize(portal_name = "", statement_name = "", parameter_format_codes = [], parameters = [], result_column_format_codes = [])
@portal_name = portal_name
@statement_name = statement_name
@parameter_format_codes = parameter_format_codes
@parameters = parameters
@result_column_format_codes = result_column_format_codes
end
def to_bytes
portal_name_bytes = encode_string(@portal_name)
statement_name_bytes = encode_string(@statement_name)
parameter_format_codes_bytes = @parameter_format_codes.pack('n*')
parameters_bytes = @parameters.map do |param|
if param.nil?
encode_int32(-1)
else
encode_int32(param.bytesize) + param
end
end.join
result_column_format_codes_bytes = @result_column_format_codes.pack('n*')
length = 4 + portal_name_bytes.size + statement_name_bytes.size + 2 + parameter_format_codes_bytes.size + 2 + parameters_bytes.size + 2 + result_column_format_codes_bytes.size
message_prefix('B', length) + portal_name_bytes + statement_name_bytes + encode_int16(@parameter_format_codes.size) + parameter_format_codes_bytes + encode_int16(@parameters.size) + parameters_bytes + encode_int16(@result_column_format_codes.size) + result_column_format_codes_bytes
end
end
class DescribeMessage < PostgresMessage
attr_accessor :type, :name
def initialize(type = 'S', name = "")
@type = type
@name = name
end
def to_bytes
name_bytes = encode_string(@name)
length = 4 + 1 + name_bytes.size
message_prefix('D', length) + @type + name_bytes
end
end
class ExecuteMessage < PostgresMessage
attr_accessor :portal_name, :max_rows
def initialize(portal_name = "", max_rows = 0)
@portal_name = portal_name
@max_rows = max_rows
end
def to_bytes
portal_name_bytes = encode_string(@portal_name)
length = 4 + portal_name_bytes.size + 4
message_prefix('E', length) + portal_name_bytes + encode_int32(@max_rows)
end
end
class FlushMessage < PostgresMessage
def to_bytes
length = 4
message_prefix('H', length)
end
end
class SyncMessage < PostgresMessage
def to_bytes
length = 4
message_prefix('S', length)
end
end
class CloseMessage < PostgresMessage
attr_accessor :type, :name
def initialize(type = 'S', name = "")
@type = type
@name = name
end
def to_bytes
name_bytes = encode_string(@name)
length = 4 + 1 + name_bytes.size
message_prefix('C', length) + @type + name_bytes
end
end

View File

@@ -1,5 +1,6 @@
require 'socket'
require 'digest/md5'
require_relative 'frontend_messages'
BACKEND_MESSAGE_CODES = {
'Z' => "ReadyForQuery",
@@ -18,9 +19,13 @@ class PostgresSocket
@host = host
@socket = TCPSocket.new @host, @port
@parameters = {}
@verbose = true
@verbose = false
end
def send_message(message)
@socket.write(message.to_bytes)
end
def send_md5_password_message(username, password, salt)
m = Digest::MD5.hexdigest(password + username)
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
@@ -113,107 +118,6 @@ class PostgresSocket
log "[F] Sent CancelRequest message"
end
def send_query_message(query)
query_size = query.length
message_size = 1 + 4 + query_size
message = []
message << "Q".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent Q message (#{query})"
end
def send_parse_message(query)
query_size = query.length
message_size = 2 + 2 + 4 + query_size
message = []
message << "P".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message << [0, 0]
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent P message (#{query})"
end
def send_bind_message
message = []
message << "B".ord
message << [12].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << 0 # unnamed statement
message << [0, 0] # 2
message << [0, 0] # 2
message << [0, 0] # 2
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent B message"
end
def send_describe_message(mode)
message = []
message << "D".ord
message << [6].pack('l>').unpack('CCCC') # 4
message << mode.ord
message << 0 # unnamed statement
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent D message"
end
def send_execute_message(limit=0)
message = []
message << "E".ord
message << [9].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << [limit].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent E message"
end
def send_sync_message
message = []
message << "S".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent S message"
end
def send_copydone_message
message = []
message << "c".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent c message"
end
def send_copyfail_message
message = []
message << "f".ord
message << [5].pack('l>').unpack('CCCC') # 4
message << 0
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent f message"
end
def send_flush_message
message = []
message << "H".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent H message"
end
def read_from_server()
output_messages = []
retry_count = 0

View File

@@ -56,41 +56,6 @@ describe "Random Load Balancing" do
end
end
end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end
describe "Least Outstanding Queries Load Balancing" do
@@ -196,3 +161,4 @@ describe "Least Outstanding Queries Load Balancing" do
end
end
end

View File

@@ -188,102 +188,6 @@ describe "Miscellaneous" do
end
end
describe "Checkout failure limit" do
context "when no checkout failure limit is set" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set high" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set low" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "disconnects client after reaching limit" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
checkout_failure_count = 0
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(1);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
checkout_failure_count += 1
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::ConnectionBad
expect(checkout_failure_count).to eq(2)
expect(conn.status).to eq(PG::CONNECTION_BAD)
break
end
end
conn.close
end
end.each(&:join)
puts processes.pgcat.logs
end
end
end
describe "Server version reporting" do
it "reports correct version for normal and admin databases" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

View File

@@ -16,10 +16,14 @@ describe "Portocol handling" do
end
def run_comparison(sequence, socket_a, socket_b)
sequence.each do |msg, *args|
socket_a.send(msg, *args)
socket_b.send(msg, *args)
sequence.each do |msg|
if msg.is_a?(Symbol)
socket_a.send(msg)
socket_b.send(msg)
else
socket_a.send_message(msg)
socket_b.send_message(msg)
end
compare_messages(
socket_a.read_from_server,
socket_b.read_from_server
@@ -83,9 +87,9 @@ describe "Portocol handling" do
context "Cancel Query" do
let(:sequence) {
[
[:send_query_message, "SELECT pg_sleep(5)"],
[:cancel_query]
[
SimpleQueryMessage.new("SELECT pg_sleep(5)"),
:cancel_query
]
}
@@ -95,12 +99,12 @@ describe "Portocol handling" do
xcontext "Simple query after parse" do
let(:sequence) {
[
[:send_parse_message, "SELECT 5"],
[:send_query_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
ParseMessage.new("", "SELECT 5", []),
SimpleQueryMessage.new("SELECT 1"),
BindMessage.new("", "", [], [], [0]),
DescribeMessage.new("P", ""),
ExecuteMessage.new("", 1),
SyncMessage.new
]
}
@@ -111,8 +115,8 @@ describe "Portocol handling" do
xcontext "Flush message" do
let(:sequence) {
[
[:send_parse_message, "SELECT 1"],
[:send_flush_message]
ParseMessage.new("", "SELECT 1", []),
FlushMessage.new
]
}
@@ -122,9 +126,7 @@ describe "Portocol handling" do
xcontext "Bind without parse" do
let(:sequence) {
[
[:send_bind_message]
]
[BindMessage.new("", "", [], [], [0])]
}
# This is known to fail.
# Server responds immediately, Proxy buffers the message
@@ -133,23 +135,155 @@ describe "Portocol handling" do
context "Simple message" do
let(:sequence) {
[[:send_query_message, "SELECT 1"]]
[SimpleQueryMessage.new("SELECT 1")]
}
it_behaves_like "at parity with database"
end
10.times do |i|
context "Extended protocol" do
let(:sequence) {
[
ParseMessage.new("", "SELECT 1", []),
BindMessage.new("", "", [], [], [0]),
DescribeMessage.new("S", ""),
ExecuteMessage.new("", 1),
SyncMessage.new
]
}
context "Extended protocol" do
let(:sequence) {
[
[:send_parse_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
]
}
it_behaves_like "at parity with database"
it_behaves_like "at parity with database"
end
end
end
describe "Protocol-level prepared statements" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "transaction") }
before do
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
table_query = "CREATE TABLE IF NOT EXISTS employees (employee_id SERIAL PRIMARY KEY, salary NUMERIC(10, 2) CHECK (salary > 0));"
q_sock.send_message(SimpleQueryMessage.new(table_query))
q_sock.close
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = 500
processes.pgcat.update_config(current_configs)
processes.pgcat.reload_config
end
after do
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
table_query = "DROP TABLE IF EXISTS employees;"
q_sock.send_message(SimpleQueryMessage.new(table_query))
q_sock.close
end
context "When unnamed prepared statements are used" do
it "does not cache them" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
10.times do |i|
socket.send_message(ParseMessage.new("", "SELECT #{i}", []))
socket.send_message(BindMessage.new("", "", [], [], [0]))
socket.send_message(DescribeMessage.new("S", ""))
socket.send_message(ExecuteMessage.new("", 1))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
result = socket.read_from_server
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
expect(number_of_saved_statements).to eq(0)
end
end
context "When named prepared statements are used" do
it "caches them" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
3.times do
socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
3.times do
socket.send_message(ParseMessage.new("my_other_query", "SELECT * FROM employees WHERE salary in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_other_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
result = socket.read_from_server
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
expect(number_of_saved_statements).to eq(2)
end
end
context "When DISCARD ALL/DEALLOCATE ALL are called" do
it "resets server and client caches" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
20.times do |i|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
end
20.times do |i|
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
end
socket.send_message(SyncMessage.new)
socket.read_from_server
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
responses = []
4.times do |i|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
socket.send_message(SyncMessage.new)
responses += socket.read_from_server
end
errors = responses.select { |message| message[:code] == 'E' }
error_message = errors.map { |message| message[:bytes].map(&:chr).join("") }.join("\n")
raise StandardError, "Encountered the following errors: #{error_message}" if errors.length > 0
end
end
context "Maximum number of bound paramters" do
it "does not crash" do
test_socket = PostgresSocket.new('localhost', processes.pgcat.port)
test_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
types = Array.new(65_535) { |i| 0 }
params = Array.new(65_535) { |i| "$#{i+1}" }.join(",")
test_socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in (#{params})", types))
test_socket.send_message(BindMessage.new("my_query", "my_query", types, types.map(&:to_s), types))
test_socket.send_message(SyncMessage.new)
# If the proxy crashes, this will raise an error
expect { test_socket.read_from_server }.to_not raise_error
test_socket.close
end
end
end
end