mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
45 Commits
pgcat-0.1.
...
pgcat-0.2.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ee59c0c40 | ||
|
|
b61d2cc6f0 | ||
|
|
c11418c083 | ||
|
|
c9544bdff2 | ||
|
|
cdcfa99fb9 | ||
|
|
f27dc6b483 | ||
|
|
326efc22b3 | ||
|
|
01c6afb2e5 | ||
|
|
a68071dd28 | ||
|
|
c27d801abf | ||
|
|
186e72298f | ||
|
|
3935366d86 | ||
|
|
b575935b1d | ||
|
|
efbab1c333 | ||
|
|
9f12d7958e | ||
|
|
e6634ef461 | ||
|
|
dab2e58647 | ||
|
|
4aaa4378cf | ||
|
|
670311daf9 | ||
|
|
b9ec7f8036 | ||
|
|
d91d23848b | ||
|
|
bbbc01a467 | ||
|
|
9bb71ede9d | ||
|
|
88b2afb19b | ||
|
|
f0865ca616 | ||
|
|
7d047c6c19 | ||
|
|
f73d15f82c | ||
|
|
69af6cc5e5 | ||
|
|
ca34597002 | ||
|
|
2def40ea6a | ||
|
|
c05129018d | ||
|
|
4a7a6a8e7a | ||
|
|
29a476e190 | ||
|
|
81933b918d | ||
|
|
7cbc9178d8 | ||
|
|
2c8b2f0776 | ||
|
|
8f9a2b8e6f | ||
|
|
cbf4d58144 | ||
|
|
731aa047ba | ||
|
|
88dbcc21d1 | ||
|
|
c34b15bddc | ||
|
|
0b034a6831 | ||
|
|
966b8e093c | ||
|
|
c9270a47d4 | ||
|
|
0d94d0b90a |
@@ -59,6 +59,7 @@ admin_password = "admin_pass"
|
||||
# session: one server connection per connected client
|
||||
# transaction: one server connection per client transaction
|
||||
pool_mode = "transaction"
|
||||
prepared_statements_cache_size = 500
|
||||
|
||||
# If the client doesn't specify, route traffic to
|
||||
# this role by default.
|
||||
@@ -141,6 +142,7 @@ query_parser_enabled = true
|
||||
query_parser_read_write_splitting = true
|
||||
primary_reads_enabled = true
|
||||
sharding_function = "pg_bigint_hash"
|
||||
prepared_statements_cache_size = 500
|
||||
|
||||
[pools.simple_db.users.0]
|
||||
username = "simple_user"
|
||||
|
||||
@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||
|
||||
# Start Toxiproxy
|
||||
kill -9 $(pgrep toxiproxy) || true
|
||||
LOG_LEVEL=error toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
@@ -106,7 +107,7 @@ cd ../..
|
||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||
#
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
python3 tests/python/tests.py || exit 1
|
||||
pytest || exit 1
|
||||
|
||||
|
||||
#
|
||||
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
|
||||
|
||||
# Allow for graceful shutdown
|
||||
sleep 1
|
||||
|
||||
kill -9 $(pgrep toxiproxy)
|
||||
sleep 1
|
||||
|
||||
16
.github/workflows/build-and-push.yaml
vendored
16
.github/workflows/build-and-push.yaml
vendored
@@ -23,14 +23,17 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout Repository
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Determine tags
|
||||
id: metadata
|
||||
uses: docker/metadata-action@v4
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ env.registry }}/${{ env.image-name }}
|
||||
tags: |
|
||||
@@ -42,15 +45,18 @@ jobs:
|
||||
type=raw,value=latest,enable={{ is_default_branch }}
|
||||
|
||||
- name: Log in to the Container registry
|
||||
uses: docker/login-action@v2.1.0
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.registry }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Build and push ${{ env.image-name }}
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
provenance: false
|
||||
push: true
|
||||
tags: ${{ steps.metadata.outputs.tags }}
|
||||
labels: ${{ steps.metadata.outputs.labels }}
|
||||
|
||||
4
.github/workflows/chart-lint-test.yaml
vendored
4
.github/workflows/chart-lint-test.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4.1.0
|
||||
uses: actions/setup-python@v5.1.0
|
||||
with:
|
||||
python-version: 3.7
|
||||
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
run: ct lint --config ct.yaml
|
||||
|
||||
- name: Create kind cluster
|
||||
uses: helm/kind-action@v1.7.0
|
||||
uses: helm/kind-action@v1.10.0
|
||||
if: steps.list-changed.outputs.changed == 'true'
|
||||
|
||||
- name: Run chart-testing (install)
|
||||
|
||||
2
.github/workflows/chart-release.yaml
vendored
2
.github/workflows/chart-release.yaml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
version: v3.13.0
|
||||
|
||||
- name: Run chart-releaser
|
||||
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
||||
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
|
||||
with:
|
||||
charts_dir: charts
|
||||
config: cr.yaml
|
||||
|
||||
15
.github/workflows/publish-deb-package.yml
vendored
15
.github/workflows/publish-deb-package.yml
vendored
@@ -1,6 +1,9 @@
|
||||
name: pgcat package (deb)
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- v*
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
packageVersion:
|
||||
@@ -16,6 +19,14 @@ jobs:
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set package version
|
||||
if: github.event_name == 'push' # For push event
|
||||
run: |
|
||||
TAG=${{ github.ref_name }}
|
||||
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
|
||||
- name: Set package version (manual dispatch)
|
||||
if: github.event_name == 'workflow_dispatch' # For manual dispatch
|
||||
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
@@ -39,10 +50,10 @@ jobs:
|
||||
export ARCH=arm64
|
||||
fi
|
||||
|
||||
bash utilities/deb.sh ${{ inputs.packageVersion }}
|
||||
bash utilities/deb.sh ${{ env.packageVersion }}
|
||||
|
||||
deb-s3 upload \
|
||||
--lock \
|
||||
--bucket apt.postgresml.org \
|
||||
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
||||
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
||||
--codename $(lsb_release -cs)
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -10,4 +10,6 @@ lcov.info
|
||||
dev/.bash_history
|
||||
dev/cache
|
||||
!dev/cache/.keepme
|
||||
.venv
|
||||
.venv
|
||||
**/__pycache__
|
||||
.bundle
|
||||
26
CONFIG.md
26
CONFIG.md
@@ -36,10 +36,11 @@ Port at which prometheus exporter listens on.
|
||||
### connect_timeout
|
||||
```
|
||||
path: general.connect_timeout
|
||||
default: 5000 # milliseconds
|
||||
default: 1000 # milliseconds
|
||||
```
|
||||
|
||||
How long to wait before aborting a server connection (ms).
|
||||
How long the client waits to obtain a server connection before aborting (ms).
|
||||
This is similar to PgBouncer's `query_wait_timeout`.
|
||||
|
||||
### idle_timeout
|
||||
```
|
||||
@@ -462,10 +463,18 @@ path: pools.<pool_name>.users.<user_index>.pool_size
|
||||
default: 9
|
||||
```
|
||||
|
||||
Maximum number of server connections that can be established for this user
|
||||
Maximum number of server connections that can be established for this user.
|
||||
The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
is the sum of pool_size across all users.
|
||||
|
||||
### min_pool_size
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.min_pool_size
|
||||
default: 0
|
||||
```
|
||||
|
||||
Minimum number of idle server connections to retain for this pool.
|
||||
|
||||
### statement_timeout
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
||||
@@ -475,6 +484,16 @@ default: 0
|
||||
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
0 means it is disabled.
|
||||
|
||||
### connect_timeout
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.connect_timeout
|
||||
default: <UNSET> # milliseconds
|
||||
```
|
||||
|
||||
How long the client waits to obtain a server connection before aborting (ms).
|
||||
This is similar to PgBouncer's `query_wait_timeout`.
|
||||
If unset, uses the `connect_timeout` defined globally.
|
||||
|
||||
## `pools.<pool_name>.shards.<shard_index>` Section
|
||||
|
||||
### servers
|
||||
@@ -502,4 +521,3 @@ default: "shard0"
|
||||
```
|
||||
|
||||
Database name (e.g. "postgres")
|
||||
|
||||
|
||||
@@ -6,6 +6,32 @@ Thank you for contributing! Just a few tips here:
|
||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||
|
||||
## How to run the integration tests locally and iterate on them
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||
|
||||
|
||||
Quite simply, make sure you have docker installed and then run
|
||||
`./start_test_env.sh`
|
||||
|
||||
That is it!
|
||||
|
||||
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
||||
|
||||
Once the environment is ready, you can run the tests by running
|
||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||
Python: `cd /app/ && pytest`
|
||||
Rust: `cd /app/tests/rust && cargo run`
|
||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||
|
||||
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
||||
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
Happy hacking!
|
||||
|
||||
## TODOs
|
||||
|
||||
109
Cargo.lock
generated
109
Cargo.lock
generated
@@ -146,6 +146,12 @@ dependencies = [
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
||||
|
||||
[[package]]
|
||||
name = "atomic_enum"
|
||||
version = "0.2.0"
|
||||
@@ -186,12 +192,11 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
|
||||
|
||||
[[package]]
|
||||
name = "bb8"
|
||||
version = "0.8.1"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
|
||||
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
@@ -542,29 +547,23 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.20"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
|
||||
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
|
||||
dependencies = [
|
||||
"atomic-waker",
|
||||
"bytes",
|
||||
"fnv",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.0"
|
||||
@@ -609,9 +608,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.9"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
|
||||
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -620,12 +619,24 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "0.4.5"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
|
||||
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
@@ -643,13 +654,12 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.27"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
|
||||
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
@@ -658,13 +668,26 @@ dependencies = [
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2 0.4.9",
|
||||
"smallvec",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.57"
|
||||
@@ -709,16 +732,6 @@ dependencies = [
|
||||
"unicode-normalization",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.0.0"
|
||||
@@ -726,7 +739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.14.0",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -848,7 +861,7 @@ version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.0",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1020,7 +1033,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "1.1.2-dev4"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
@@ -1034,7 +1047,9 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"hmac",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"itertools",
|
||||
"jemallocator",
|
||||
"log",
|
||||
@@ -1478,9 +1493,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.11.0"
|
||||
version = "1.13.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
@@ -1510,9 +1525,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.34.0"
|
||||
version = "0.41.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
|
||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||
dependencies = [
|
||||
"log",
|
||||
"sqlparser_derive",
|
||||
@@ -1520,13 +1535,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser_derive"
|
||||
version = "0.1.1"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
|
||||
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1741,19 +1756,13 @@ version = "0.19.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
|
||||
dependencies = [
|
||||
"indexmap 2.0.0",
|
||||
"indexmap",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.37"
|
||||
|
||||
17
Cargo.toml
17
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "1.1.2-dev4"
|
||||
version = "1.2.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -8,7 +8,7 @@ edition = "2021"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
bytes = "1"
|
||||
md-5 = "0.10"
|
||||
bb8 = "0.8.1"
|
||||
bb8 = "=0.8.6"
|
||||
async-trait = "0.1"
|
||||
rand = "0.8"
|
||||
chrono = "0.4"
|
||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = {version = "0.34", features = ["visitor"] }
|
||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
parking_lot = "0.12.1"
|
||||
@@ -29,7 +29,9 @@ base64 = "0.21"
|
||||
stringprep = "0.1"
|
||||
tokio-rustls = "0.24"
|
||||
rustls-pemfile = "1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
http-body-util = "0.1.2"
|
||||
hyper = { version = "1.4.1", features = ["full"] }
|
||||
hyper-util = { version = "0.1.7", features = ["tokio"] }
|
||||
phf = { version = "0.11.1", features = ["macros"] }
|
||||
exitcode = "1.1.2"
|
||||
futures = "0.3"
|
||||
@@ -47,9 +49,12 @@ serde_json = "1"
|
||||
itertools = "0.10"
|
||||
clap = { version = "4.3.1", features = ["derive", "env"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
|
||||
tracing-subscriber = { version = "0.3.17", features = [
|
||||
"json",
|
||||
"env-filter",
|
||||
"std",
|
||||
] }
|
||||
lru = "0.12.0"
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.5.0"
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1-slim-bookworm AS builder
|
||||
FROM rust:1.79.0-slim-bookworm AS builder
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
@@ -19,3 +19,4 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
|
||||
WORKDIR /etc/pgcat
|
||||
ENV RUST_LOG=info
|
||||
CMD ["pgcat"]
|
||||
STOPSIGNAL SIGINT
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM cimg/rust:1.67.1
|
||||
FROM cimg/rust:1.79.0
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
RUN sudo apt-get update && \
|
||||
|
||||
@@ -268,6 +268,8 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
|
||||
|
||||
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
||||
|
||||
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
|
||||
|
||||
### Live configuration reloading
|
||||
|
||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
||||
|
||||
@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
|
||||
maintainers:
|
||||
- name: Wildcard
|
||||
email: support@w6d.io
|
||||
appVersion: "1.1.1"
|
||||
version: 0.1.0
|
||||
appVersion: "1.2.0"
|
||||
version: 0.2.4
|
||||
|
||||
@@ -15,6 +15,7 @@ stringData:
|
||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||
server_tls = {{ .Values.configuration.general.server_tls }}
|
||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||
@@ -58,11 +59,21 @@ stringData:
|
||||
##
|
||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||
username = {{ $user.username | quote }}
|
||||
{{- if $user.password }}
|
||||
password = {{ $user.password | quote }}
|
||||
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
|
||||
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
|
||||
{{- if $secret }}
|
||||
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
|
||||
password = {{ $password | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
pool_size = {{ $user.pool_size }}
|
||||
statement_timeout = {{ $user.statement_timeout }}
|
||||
min_pool_size = 3
|
||||
server_lifetime = 60000
|
||||
statement_timeout = {{ default 0 $user.statement_timeout }}
|
||||
min_pool_size = {{ default 3 $user.min_pool_size }}
|
||||
{{- if $user.server_lifetime }}
|
||||
server_lifetime = {{ $user.server_lifetime }}
|
||||
{{- end }}
|
||||
{{- if and $user.server_username $user.server_password }}
|
||||
server_username = {{ $user.server_username | quote }}
|
||||
server_password = {{ $user.server_password | quote }}
|
||||
|
||||
@@ -170,13 +170,16 @@ configuration:
|
||||
connect_timeout: 5000
|
||||
|
||||
# How long an idle connection with a server is left open (ms).
|
||||
idle_timeout: 30000 # milliseconds
|
||||
idle_timeout: 30000 # milliseconds
|
||||
|
||||
# Max connection lifetime before it's closed, even if actively used.
|
||||
server_lifetime: 86400000 # 24 hours
|
||||
server_lifetime: 86400000 # 24 hours
|
||||
|
||||
# Whether to use TLS for server connections or not.
|
||||
server_tls: false
|
||||
|
||||
# How long a client is allowed to be idle while in a transaction (ms).
|
||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||
|
||||
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
|
||||
healthcheck_timeout: 1000
|
||||
@@ -240,7 +243,15 @@ configuration:
|
||||
## the pool_name is what clients use as database name when connecting
|
||||
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
|
||||
## @param [object]
|
||||
pools: []
|
||||
pools:
|
||||
[{
|
||||
name: "simple", pool_mode: "transaction",
|
||||
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
|
||||
shards: [{
|
||||
servers: [{host: "postgres", port: 5432, role: "primary"}],
|
||||
database: "postgres"
|
||||
}]
|
||||
}]
|
||||
# - ## default values
|
||||
# ##
|
||||
# ##
|
||||
@@ -307,7 +318,9 @@ configuration:
|
||||
# ## Credentials for users that may connect to this cluster
|
||||
# ## @param users [array]
|
||||
# ## @param users[0].username Name of the env var (required)
|
||||
# ## @param users[0].password Value for the env var (required)
|
||||
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
|
||||
# ## @param users[0].passwordSecret.name Name of the secret containing the password
|
||||
# ## @param users[0].passwordSecret.key Key in the secret containing the password
|
||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
# users: []
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.70-bullseye
|
||||
FROM rust:bullseye
|
||||
|
||||
# Dependencies
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
|
||||
2124
grafana_dashboard.json
Normal file
2124
grafana_dashboard.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -11,6 +11,7 @@ RestartSec=1
|
||||
Environment=RUST_LOG=info
|
||||
LimitNOFILE=65536
|
||||
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
||||
ExecReload=/bin/kill -SIGHUP $MAINPID
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
@@ -179,7 +179,7 @@ primary_reads_enabled = true
|
||||
# `random`: picks a shard at random
|
||||
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
||||
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
||||
# no_shard_specified_behavior = "shard_0"
|
||||
# default_shard = "shard_0"
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
# or you've already built one and you want this pooler to use it?
|
||||
|
||||
4
postinst
4
postinst
@@ -7,3 +7,7 @@ systemctl enable pgcat
|
||||
if ! id pgcat 2> /dev/null; then
|
||||
useradd -s /usr/bin/false pgcat
|
||||
fi
|
||||
|
||||
if [ -f /etc/pgcat.toml ]; then
|
||||
systemctl start pgcat
|
||||
fi
|
||||
|
||||
14
src/admin.rs
14
src/admin.rs
@@ -55,7 +55,12 @@ where
|
||||
|
||||
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
||||
|
||||
match query_parts[0].to_ascii_uppercase().as_str() {
|
||||
match query_parts
|
||||
.first()
|
||||
.unwrap_or(&"")
|
||||
.to_ascii_uppercase()
|
||||
.as_str()
|
||||
{
|
||||
"BAN" => {
|
||||
trace!("BAN");
|
||||
ban(stream, query_parts).await
|
||||
@@ -84,7 +89,12 @@ where
|
||||
trace!("SHUTDOWN");
|
||||
shutdown(stream).await
|
||||
}
|
||||
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
|
||||
"SHOW" => match query_parts
|
||||
.get(1)
|
||||
.unwrap_or(&"")
|
||||
.to_ascii_uppercase()
|
||||
.as_str()
|
||||
{
|
||||
"HELP" => {
|
||||
trace!("SHOW HELP");
|
||||
show_help(stream).await
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::config::AuthType;
|
||||
use crate::errors::Error;
|
||||
use crate::pool::ConnectionPool;
|
||||
use crate::server::Server;
|
||||
@@ -71,6 +72,7 @@ impl AuthPassthrough {
|
||||
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
||||
let auth_user = crate::config::User {
|
||||
username: self.user.clone(),
|
||||
auth_type: AuthType::MD5,
|
||||
password: Some(self.password.clone()),
|
||||
server_username: None,
|
||||
server_password: None,
|
||||
|
||||
318
src/client.rs
318
src/client.rs
@@ -14,7 +14,9 @@ use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
||||
use crate::auth_passthrough::refetch_auth_hash;
|
||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||
use crate::config::{
|
||||
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
|
||||
};
|
||||
use crate::constants::*;
|
||||
use crate::messages::*;
|
||||
use crate::plugins::PluginOutput;
|
||||
@@ -463,8 +465,8 @@ where
|
||||
.count()
|
||||
== 1;
|
||||
|
||||
// Kick any client that's not admin while we're in admin-only mode.
|
||||
if !admin && admin_only {
|
||||
// Kick any client that's not admin while we're in admin-only mode.
|
||||
debug!(
|
||||
"Rejecting non-admin connection to {} when in admin only mode",
|
||||
pool_name
|
||||
@@ -481,72 +483,76 @@ where
|
||||
let process_id: i32 = rand::random();
|
||||
let secret_key: i32 = rand::random();
|
||||
|
||||
// Perform MD5 authentication.
|
||||
// TODO: Add SASL support.
|
||||
let salt = md5_challenge(&mut write).await?;
|
||||
|
||||
let code = match read.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password code".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// PasswordMessage
|
||||
if code as char != 'p' {
|
||||
return Err(Error::ProtocolSyncError(format!(
|
||||
"Expected p, got {}",
|
||||
code as char
|
||||
)));
|
||||
}
|
||||
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message length".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match read.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let mut prepared_statements_enabled = false;
|
||||
|
||||
// Authenticate admin user.
|
||||
let (transaction_mode, mut server_parameters) = if admin {
|
||||
let config = get_config();
|
||||
// TODO: Add SASL support.
|
||||
// Perform MD5 authentication.
|
||||
match config.general.admin_auth_type {
|
||||
AuthType::Trust => (),
|
||||
AuthType::MD5 => {
|
||||
let salt = md5_challenge(&mut write).await?;
|
||||
|
||||
// Compare server and client hashes.
|
||||
let password_hash = md5_hash_password(
|
||||
&config.general.admin_username,
|
||||
&config.general.admin_password,
|
||||
&salt,
|
||||
);
|
||||
let code = match read.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password code".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
if password_hash != password_response {
|
||||
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||
// PasswordMessage
|
||||
if code as char != 'p' {
|
||||
return Err(Error::ProtocolSyncError(format!(
|
||||
"Expected p, got {}",
|
||||
code as char
|
||||
)));
|
||||
}
|
||||
|
||||
warn!("{}", error);
|
||||
wrong_password(&mut write, username).await?;
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message length".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
return Err(error);
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match read.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Compare server and client hashes.
|
||||
let password_hash = md5_hash_password(
|
||||
&config.general.admin_username,
|
||||
&config.general.admin_password,
|
||||
&salt,
|
||||
);
|
||||
|
||||
if password_hash != password_response {
|
||||
let error =
|
||||
Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||
|
||||
warn!("{}", error);
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(false, generate_server_parameters_for_admin())
|
||||
}
|
||||
// Authenticate normal user.
|
||||
@@ -573,92 +579,143 @@ where
|
||||
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
||||
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
||||
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||
Some(md5_hash_password(username, password, &salt))
|
||||
} else {
|
||||
if !get_config().is_auth_query_configured() {
|
||||
wrong_password(&mut write, username).await?;
|
||||
return Err(Error::ClientAuthImpossible(username.into()));
|
||||
}
|
||||
match pool.settings.user.auth_type {
|
||||
AuthType::Trust => (),
|
||||
AuthType::MD5 => {
|
||||
// Perform MD5 authentication.
|
||||
// TODO: Add SASL support.
|
||||
let salt = md5_challenge(&mut write).await?;
|
||||
|
||||
let mut hash = (*pool.auth_hash.read()).clone();
|
||||
let code = match read.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password code".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
if hash.is_none() {
|
||||
warn!(
|
||||
"Query auth configured \
|
||||
but no hash password found \
|
||||
for pool {}. Will try to refetch it.",
|
||||
pool_name
|
||||
);
|
||||
// PasswordMessage
|
||||
if code as char != 'p' {
|
||||
return Err(Error::ProtocolSyncError(format!(
|
||||
"Expected p, got {}",
|
||||
code as char
|
||||
)));
|
||||
}
|
||||
|
||||
match refetch_auth_hash(&pool).await {
|
||||
Ok(fetched_hash) => {
|
||||
warn!("Password for {}, obtained. Updating.", client_identifier);
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message length".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match read.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||
Some(md5_hash_password(username, password, &salt))
|
||||
} else {
|
||||
if !get_config().is_auth_query_configured() {
|
||||
wrong_password(&mut write, username).await?;
|
||||
return Err(Error::ClientAuthImpossible(username.into()));
|
||||
}
|
||||
|
||||
let mut hash = (*pool.auth_hash.read()).clone();
|
||||
|
||||
if hash.is_none() {
|
||||
warn!(
|
||||
"Query auth configured \
|
||||
but no hash password found \
|
||||
for pool {}. Will try to refetch it.",
|
||||
pool_name
|
||||
);
|
||||
|
||||
match refetch_auth_hash(&pool).await {
|
||||
Ok(fetched_hash) => {
|
||||
warn!(
|
||||
"Password for {}, obtained. Updating.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
{
|
||||
let mut pool_auth_hash = pool.auth_hash.write();
|
||||
*pool_auth_hash = Some(fetched_hash.clone());
|
||||
}
|
||||
|
||||
hash = Some(fetched_hash);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(Error::ClientAuthPassthroughError(
|
||||
err.to_string(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
||||
};
|
||||
|
||||
// Once we have the resulting hash, we compare with what the client gave us.
|
||||
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
||||
// to see if the password has changed since the pool was created.
|
||||
//
|
||||
// @TODO: we could end up fetching again the same password twice (see above).
|
||||
if password_hash.unwrap() != password_response {
|
||||
warn!(
|
||||
"Invalid password {}, will try to refetch it.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
||||
Ok(fetched_hash) => fetched_hash,
|
||||
Err(err) => {
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||
|
||||
// Ok password changed in server an auth is possible.
|
||||
if new_password_hash == password_response {
|
||||
warn!(
|
||||
"Password for {}, changed in server. Updating.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
{
|
||||
let mut pool_auth_hash = pool.auth_hash.write();
|
||||
*pool_auth_hash = Some(fetched_hash.clone());
|
||||
*pool_auth_hash = Some(fetched_hash);
|
||||
}
|
||||
|
||||
hash = Some(fetched_hash);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
} else {
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(Error::ClientAuthPassthroughError(
|
||||
err.to_string(),
|
||||
return Err(Error::ClientGeneralError(
|
||||
"Invalid password".into(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
||||
};
|
||||
|
||||
// Once we have the resulting hash, we compare with what the client gave us.
|
||||
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
||||
// to see if the password has changed since the pool was created.
|
||||
//
|
||||
// @TODO: we could end up fetching again the same password twice (see above).
|
||||
if password_hash.unwrap() != password_response {
|
||||
warn!(
|
||||
"Invalid password {}, will try to refetch it.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
||||
Ok(fetched_hash) => fetched_hash,
|
||||
Err(err) => {
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||
|
||||
// Ok password changed in server an auth is possible.
|
||||
if new_password_hash == password_response {
|
||||
warn!(
|
||||
"Password for {}, changed in server. Updating.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
{
|
||||
let mut pool_auth_hash = pool.auth_hash.write();
|
||||
*pool_auth_hash = Some(fetched_hash);
|
||||
}
|
||||
} else {
|
||||
wrong_password(&mut write, username).await?;
|
||||
return Err(Error::ClientGeneralError(
|
||||
"Invalid password".into(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
||||
prepared_statements_enabled =
|
||||
transaction_mode && pool.prepared_statement_cache.is_some();
|
||||
@@ -824,6 +881,7 @@ where
|
||||
};
|
||||
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
query_router.set_default_role();
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
|
||||
@@ -38,12 +38,12 @@ pub enum Role {
|
||||
Mirror,
|
||||
}
|
||||
|
||||
impl ToString for Role {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
Role::Primary => "primary".to_string(),
|
||||
Role::Replica => "replica".to_string(),
|
||||
Role::Mirror => "mirror".to_string(),
|
||||
impl std::fmt::Display for Role {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Role::Primary => write!(f, "primary"),
|
||||
Role::Replica => write!(f, "replica"),
|
||||
Role::Mirror => write!(f, "mirror"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,6 +208,9 @@ impl Address {
|
||||
pub struct User {
|
||||
pub username: String,
|
||||
pub password: Option<String>,
|
||||
|
||||
#[serde(default = "User::default_auth_type")]
|
||||
pub auth_type: AuthType,
|
||||
pub server_username: Option<String>,
|
||||
pub server_password: Option<String>,
|
||||
pub pool_size: u32,
|
||||
@@ -225,6 +228,7 @@ impl Default for User {
|
||||
User {
|
||||
username: String::from("postgres"),
|
||||
password: None,
|
||||
auth_type: AuthType::MD5,
|
||||
server_username: None,
|
||||
server_password: None,
|
||||
pool_size: 15,
|
||||
@@ -239,6 +243,10 @@ impl Default for User {
|
||||
}
|
||||
|
||||
impl User {
|
||||
pub fn default_auth_type() -> AuthType {
|
||||
AuthType::MD5
|
||||
}
|
||||
|
||||
fn validate(&self) -> Result<(), Error> {
|
||||
if let Some(min_pool_size) = self.min_pool_size {
|
||||
if min_pool_size > self.pool_size {
|
||||
@@ -334,6 +342,9 @@ pub struct General {
|
||||
pub admin_username: String,
|
||||
pub admin_password: String,
|
||||
|
||||
#[serde(default = "General::default_admin_auth_type")]
|
||||
pub admin_auth_type: AuthType,
|
||||
|
||||
#[serde(default = "General::default_validate_config")]
|
||||
pub validate_config: bool,
|
||||
|
||||
@@ -348,6 +359,10 @@ impl General {
|
||||
"0.0.0.0".into()
|
||||
}
|
||||
|
||||
pub fn default_admin_auth_type() -> AuthType {
|
||||
AuthType::MD5
|
||||
}
|
||||
|
||||
pub fn default_port() -> u16 {
|
||||
5432
|
||||
}
|
||||
@@ -456,6 +471,7 @@ impl Default for General {
|
||||
verify_server_certificate: false,
|
||||
admin_username: String::from("admin"),
|
||||
admin_password: String::from("admin"),
|
||||
admin_auth_type: AuthType::MD5,
|
||||
validate_config: true,
|
||||
auth_query: None,
|
||||
auth_query_user: None,
|
||||
@@ -476,11 +492,20 @@ pub enum PoolMode {
|
||||
Session,
|
||||
}
|
||||
|
||||
impl ToString for PoolMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
PoolMode::Transaction => "transaction".to_string(),
|
||||
PoolMode::Session => "session".to_string(),
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
||||
pub enum AuthType {
|
||||
#[serde(alias = "trust", alias = "Trust")]
|
||||
Trust,
|
||||
|
||||
#[serde(alias = "md5", alias = "MD5")]
|
||||
MD5,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PoolMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
PoolMode::Transaction => write!(f, "transaction"),
|
||||
PoolMode::Session => write!(f, "session"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -493,12 +518,13 @@ pub enum LoadBalancingMode {
|
||||
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
||||
LeastOutstandingConnections,
|
||||
}
|
||||
impl ToString for LoadBalancingMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
LoadBalancingMode::Random => "random".to_string(),
|
||||
|
||||
impl std::fmt::Display for LoadBalancingMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
LoadBalancingMode::Random => write!(f, "random"),
|
||||
LoadBalancingMode::LeastOutstandingConnections => {
|
||||
"least_outstanding_connections".to_string()
|
||||
write!(f, "least_outstanding_connections")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -999,15 +1025,17 @@ impl Config {
|
||||
pub fn fill_up_auth_query_config(&mut self) {
|
||||
for (_name, pool) in self.pools.iter_mut() {
|
||||
if pool.auth_query.is_none() {
|
||||
pool.auth_query = self.general.auth_query.clone();
|
||||
pool.auth_query.clone_from(&self.general.auth_query);
|
||||
}
|
||||
|
||||
if pool.auth_query_user.is_none() {
|
||||
pool.auth_query_user = self.general.auth_query_user.clone();
|
||||
pool.auth_query_user
|
||||
.clone_from(&self.general.auth_query_user);
|
||||
}
|
||||
|
||||
if pool.auth_query_password.is_none() {
|
||||
pool.auth_query_password = self.general.auth_query_password.clone();
|
||||
pool.auth_query_password
|
||||
.clone_from(&self.general.auth_query_password);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1155,7 +1183,7 @@ impl Config {
|
||||
"Default max server lifetime: {}ms",
|
||||
self.general.server_lifetime
|
||||
);
|
||||
info!("Sever round robin: {}", self.general.server_round_robin);
|
||||
info!("Server round robin: {}", self.general.server_round_robin);
|
||||
match self.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
info!("TLS certificate: {}", tls_certificate);
|
||||
|
||||
@@ -733,6 +733,10 @@ pub fn configure_socket(stream: &TcpStream) {
|
||||
}
|
||||
Err(err) => error!("Could not configure socket: {}", err),
|
||||
}
|
||||
match sock_ref.set_nodelay(true) {
|
||||
Ok(_) => (),
|
||||
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BytesMutReader {
|
||||
|
||||
@@ -813,7 +813,7 @@ impl ConnectionPool {
|
||||
}
|
||||
}
|
||||
|
||||
client_stats.checkout_success();
|
||||
client_stats.checkout_error();
|
||||
|
||||
Err(Error::AllServersDown)
|
||||
}
|
||||
|
||||
@@ -1,23 +1,41 @@
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use http_body_util::Full;
|
||||
use hyper::body;
|
||||
use hyper::body::Bytes;
|
||||
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Method, Request, Response, StatusCode};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use log::{debug, error, info};
|
||||
use phf::phf_map;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use crate::config::Address;
|
||||
use crate::pool::{get_all_pools, PoolIdentifier};
|
||||
use crate::stats::get_server_stats;
|
||||
use crate::stats::pool::PoolStats;
|
||||
use crate::stats::{get_server_stats, ServerStats};
|
||||
|
||||
struct MetricHelpType {
|
||||
help: &'static str,
|
||||
ty: &'static str,
|
||||
}
|
||||
|
||||
struct ServerPrometheusStats {
|
||||
bytes_received: u64,
|
||||
bytes_sent: u64,
|
||||
transaction_count: u64,
|
||||
query_count: u64,
|
||||
error_count: u64,
|
||||
active_count: u64,
|
||||
idle_count: u64,
|
||||
login_count: u64,
|
||||
tested_count: u64,
|
||||
}
|
||||
|
||||
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
||||
// counters only increase
|
||||
// gauges can arbitrarily increase or decrease
|
||||
@@ -120,22 +138,46 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
|
||||
},
|
||||
"servers_bytes_received" => MetricHelpType {
|
||||
help: "Volume in bytes of network traffic received by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_bytes_sent" => MetricHelpType {
|
||||
help: "Volume in bytes of network traffic sent by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_transaction_count" => MetricHelpType {
|
||||
help: "Number of transactions executed by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_query_count" => MetricHelpType {
|
||||
help: "Number of queries executed by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_error_count" => MetricHelpType {
|
||||
help: "Number of errors",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_idle_count" => MetricHelpType {
|
||||
help: "Number of server connection in idle state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_active_count" => MetricHelpType {
|
||||
help: "Number of server connection in active state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_tested_count" => MetricHelpType {
|
||||
help: "Number of server connection in tested state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_login_count" => MetricHelpType {
|
||||
help: "Number of server connection in login state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_is_banned" => MetricHelpType {
|
||||
help: "0 if server is not banned, 1 if server is banned",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_is_paused" => MetricHelpType {
|
||||
help: "0 if server is not paused, 1 if server is paused",
|
||||
ty: "gauge",
|
||||
},
|
||||
"databases_pool_size" => MetricHelpType {
|
||||
@@ -158,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
|
||||
|
||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let formatted_labels = self
|
||||
.labels
|
||||
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
||||
sorted_labels.sort_by_key(|&(key, _)| key);
|
||||
let formatted_labels = sorted_labels
|
||||
.iter()
|
||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
write!(
|
||||
f,
|
||||
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
||||
"{name}{{{formatted_labels}}} {value}",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
formatted_labels = formatted_labels,
|
||||
value = self.value
|
||||
)
|
||||
@@ -203,7 +244,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||
}
|
||||
@@ -218,7 +261,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||
}
|
||||
@@ -229,7 +274,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||
}
|
||||
@@ -241,9 +288,20 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
|
||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||
}
|
||||
|
||||
fn get_header(&self) -> String {
|
||||
format!(
|
||||
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
|
||||
async fn prometheus_stats(
|
||||
request: Request<body::Incoming>,
|
||||
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
|
||||
match (request.method(), request.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let mut lines = Vec::new();
|
||||
@@ -251,6 +309,7 @@ async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hype
|
||||
push_pool_stats(&mut lines);
|
||||
push_server_stats(&mut lines);
|
||||
push_database_stats(&mut lines);
|
||||
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
|
||||
|
||||
Response::builder()
|
||||
.header("content-type", "text/plain; version=0.0.4")
|
||||
@@ -264,6 +323,7 @@ async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hype
|
||||
|
||||
// Adds metrics shown in a SHOW STATS admin command.
|
||||
fn push_address_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
@@ -273,7 +333,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -281,33 +344,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
let pool_stats = PoolStats::construct_pool_lookup();
|
||||
for (pool_id, stats) in pool_stats.iter() {
|
||||
for (name, value) in stats.clone() {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(name)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||
fn push_database_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
let pool_config = pool.settings.clone();
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
|
||||
let metrics = vec![
|
||||
("pool_size", pool_config.user.pool_size),
|
||||
("current_connections", pool_state.connections),
|
||||
@@ -316,7 +399,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -324,45 +410,73 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||
fn push_server_stats(lines: &mut Vec<String>) {
|
||||
let server_stats = get_server_stats();
|
||||
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
|
||||
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
|
||||
for (_, stats) in server_stats {
|
||||
server_stats_by_addresses.insert(stats.address_name(), stats);
|
||||
let entry = prom_stats
|
||||
.entry(stats.address_name())
|
||||
.or_insert(ServerPrometheusStats {
|
||||
bytes_received: 0,
|
||||
bytes_sent: 0,
|
||||
transaction_count: 0,
|
||||
query_count: 0,
|
||||
error_count: 0,
|
||||
active_count: 0,
|
||||
idle_count: 0,
|
||||
login_count: 0,
|
||||
tested_count: 0,
|
||||
});
|
||||
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
|
||||
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
|
||||
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
|
||||
entry.query_count += stats.query_count.load(Ordering::Relaxed);
|
||||
entry.error_count += stats.error_count.load(Ordering::Relaxed);
|
||||
match stats.state.load(Ordering::Relaxed) {
|
||||
crate::stats::ServerState::Login => entry.login_count += 1,
|
||||
crate::stats::ServerState::Active => entry.active_count += 1,
|
||||
crate::stats::ServerState::Tested => entry.tested_count += 1,
|
||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||
}
|
||||
}
|
||||
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
|
||||
if let Some(server_info) = prom_stats.get(&address.name()) {
|
||||
let metrics = [
|
||||
(
|
||||
"bytes_received",
|
||||
server_info.bytes_received.load(Ordering::Relaxed),
|
||||
),
|
||||
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
|
||||
(
|
||||
"transaction_count",
|
||||
server_info.transaction_count.load(Ordering::Relaxed),
|
||||
),
|
||||
(
|
||||
"query_count",
|
||||
server_info.query_count.load(Ordering::Relaxed),
|
||||
),
|
||||
(
|
||||
"error_count",
|
||||
server_info.error_count.load(Ordering::Relaxed),
|
||||
),
|
||||
("bytes_received", server_info.bytes_received),
|
||||
("bytes_sent", server_info.bytes_sent),
|
||||
("transaction_count", server_info.transaction_count),
|
||||
("query_count", server_info.query_count),
|
||||
("error_count", server_info.error_count),
|
||||
("idle_count", server_info.idle_count),
|
||||
("active_count", server_info.active_count),
|
||||
("login_count", server_info.login_count),
|
||||
("tested_count", server_info.tested_count),
|
||||
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
|
||||
("is_paused", if pool.paused() { 1 } else { 0 }),
|
||||
];
|
||||
for (key, value) in metrics {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -371,17 +485,46 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||
let http_service_factory =
|
||||
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
|
||||
let server = Server::bind(&http_addr).serve(http_service_factory);
|
||||
let listener = TcpListener::bind(http_addr);
|
||||
let listener = match listener.await {
|
||||
Ok(listener) => listener,
|
||||
Err(e) => {
|
||||
error!("Failed to bind prometheus server to HTTP address: {}.", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Exposing prometheus metrics on http://{}/metrics.",
|
||||
http_addr
|
||||
);
|
||||
if let Err(e) = server.await {
|
||||
error!("Failed to run HTTP server: {}.", e);
|
||||
loop {
|
||||
let stream = match listener.accept().await {
|
||||
Ok((stream, _)) => stream,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(prometheus_stats))
|
||||
.await
|
||||
{
|
||||
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,6 +386,18 @@ impl QueryRouter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if a query is a mutation or not.
|
||||
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
|
||||
use sqlparser::ast::*;
|
||||
|
||||
match q.body.as_ref() {
|
||||
SetExpr::Insert(_) => true,
|
||||
SetExpr::Update(_) => true,
|
||||
SetExpr::Query(q) => Self::is_mutation_query(q),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||
if !self.pool_settings.query_parser_read_write_splitting {
|
||||
@@ -427,8 +439,13 @@ impl QueryRouter {
|
||||
None => (),
|
||||
};
|
||||
|
||||
// If we already visited a write statement, we should be going to the primary.
|
||||
if !visited_write_statement {
|
||||
let has_locks = !query.locks.is_empty();
|
||||
let has_mutation = Self::is_mutation_query(query);
|
||||
|
||||
if has_locks || has_mutation {
|
||||
self.active_role = Some(Role::Primary);
|
||||
} else if !visited_write_statement {
|
||||
// If we already visited a write statement, we should be going to the primary.
|
||||
self.active_role = match self.primary_reads_enabled() {
|
||||
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
|
||||
true => None, // Any server role is fine in this case.
|
||||
@@ -499,6 +516,7 @@ impl QueryRouter {
|
||||
table: _,
|
||||
on: _,
|
||||
returning: _,
|
||||
ignore: _,
|
||||
} => {
|
||||
// Not supported in postgres.
|
||||
assert!(or.is_none());
|
||||
@@ -506,7 +524,9 @@ impl QueryRouter {
|
||||
assert!(after_columns.is_empty());
|
||||
|
||||
Self::process_table(table_name, &mut table_names);
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
if let Some(source) = source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
}
|
||||
}
|
||||
Delete {
|
||||
tables,
|
||||
@@ -514,6 +534,8 @@ impl QueryRouter {
|
||||
using,
|
||||
selection,
|
||||
returning: _,
|
||||
order_by: _,
|
||||
limit: _,
|
||||
} => {
|
||||
if let Some(expr) = selection {
|
||||
exprs.push(expr.clone());
|
||||
@@ -1039,6 +1061,11 @@ impl QueryRouter {
|
||||
self.active_shard
|
||||
}
|
||||
|
||||
/// Set active_role as the default_role specified in the pool.
|
||||
pub fn set_default_role(&mut self) {
|
||||
self.active_role = self.pool_settings.default_role;
|
||||
}
|
||||
|
||||
/// Get the current desired server role we should be talking to.
|
||||
pub fn role(&self) -> Option<Role> {
|
||||
self.active_role
|
||||
@@ -1104,6 +1131,26 @@ mod test {
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_cte_queries() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
|
||||
let query = simple_query(
|
||||
"WITH t AS (
|
||||
SELECT id FROM users WHERE name ILIKE '%ja%'
|
||||
)
|
||||
UPDATE user_languages
|
||||
SET settings = '{}'
|
||||
FROM t WHERE t.id = user_id;",
|
||||
);
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_replica() {
|
||||
QueryRouter::setup();
|
||||
@@ -1153,6 +1200,29 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select_for_update() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
|
||||
let queries_in_primary_role = vec![
|
||||
simple_query("BEGIN"), // Transaction start
|
||||
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
|
||||
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
|
||||
];
|
||||
|
||||
for query in queries_in_primary_role {
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
// query without lock do not change role
|
||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_primary_reads_enabled() {
|
||||
QueryRouter::setup();
|
||||
@@ -1367,6 +1437,19 @@ mod test {
|
||||
assert!(!qr.query_parser_enabled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_parser() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
|
||||
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
|
||||
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_from_pool_settings() {
|
||||
QueryRouter::setup();
|
||||
|
||||
@@ -14,11 +14,11 @@ pub enum ShardingFunction {
|
||||
Sha1,
|
||||
}
|
||||
|
||||
impl ToString for ShardingFunction {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
|
||||
ShardingFunction::Sha1 => "sha1".to_string(),
|
||||
impl std::fmt::Display for ShardingFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
|
||||
ShardingFunction::Sha1 => write!(f, "sha1"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
34
start_test_env.sh
Executable file
34
start_test_env.sh
Executable file
@@ -0,0 +1,34 @@
|
||||
GREEN="\033[0;32m"
|
||||
RED="\033[0;31m"
|
||||
BLUE="\033[0;34m"
|
||||
RESET="\033[0m"
|
||||
|
||||
|
||||
cd tests/docker/
|
||||
docker compose kill main || true
|
||||
docker compose build main
|
||||
docker compose down
|
||||
docker compose up -d
|
||||
# wait for the container to start
|
||||
while ! docker compose exec main ls; do
|
||||
echo "Waiting for test environment to start"
|
||||
sleep 1
|
||||
done
|
||||
echo "==================================="
|
||||
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
||||
docker compose exec --workdir /app main cargo build
|
||||
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
||||
docker compose exec --workdir /app/tests/ruby main bundle install
|
||||
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
||||
echo "Interactive test environment ready"
|
||||
echo "To run integration tests, you can use the following commands:"
|
||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo -e " ${GREEN}cargo build${RESET}"
|
||||
echo "and then run the tests again"
|
||||
echo "==================================="
|
||||
docker compose exec --workdir /app/tests main bash
|
||||
@@ -1,4 +1,3 @@
|
||||
version: "3"
|
||||
services:
|
||||
pg1:
|
||||
image: postgres:14
|
||||
@@ -48,6 +47,8 @@ services:
|
||||
main:
|
||||
build: .
|
||||
command: ["bash", "/app/tests/docker/run.sh"]
|
||||
environment:
|
||||
- INTERACTIVE_TEST_ENVIRONMENT=true
|
||||
volumes:
|
||||
- ../../:/app/
|
||||
- /app/target/
|
||||
|
||||
@@ -5,6 +5,38 @@ rm /app/*.profraw || true
|
||||
rm /app/pgcat.profdata || true
|
||||
rm -rf /app/cov || true
|
||||
|
||||
# Prepares the interactive test environment
|
||||
#
|
||||
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
||||
ports=(5432 7432 8432 9432 10432)
|
||||
for port in "${ports[@]}"; do
|
||||
is_it_up=0
|
||||
attempts=0
|
||||
while [ $is_it_up -eq 0 ]; do
|
||||
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "PostgreSQL on port $port is up."
|
||||
is_it_up=1
|
||||
else
|
||||
attempts=$((attempts+1))
|
||||
if [ $attempts -gt 10 ]; then
|
||||
echo "PostgreSQL on port $port is down, giving up."
|
||||
exit 1
|
||||
fi
|
||||
echo "PostgreSQL on port $port is down, waiting for it to start."
|
||||
sleep 1
|
||||
fi
|
||||
done
|
||||
done
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
sleep 100000000000000000
|
||||
exit 0
|
||||
fi
|
||||
|
||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||
export RUSTC_BOOTSTRAP=1
|
||||
export CARGO_INCREMENTAL=0
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pytest
|
||||
psycopg2==2.9.3
|
||||
psutil==5.9.1
|
||||
psutil==5.9.1
|
||||
|
||||
71
tests/python/test_auth.py
Normal file
71
tests/python/test_auth.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import utils
|
||||
import signal
|
||||
|
||||
class TestTrustAuth:
|
||||
@classmethod
|
||||
def setup_method(cls):
|
||||
config= """
|
||||
[general]
|
||||
host = "0.0.0.0"
|
||||
port = 6432
|
||||
admin_username = "admin_user"
|
||||
admin_password = ""
|
||||
admin_auth_type = "trust"
|
||||
|
||||
[pools.sharded_db.users.0]
|
||||
username = "sharding_user"
|
||||
password = "sharding_user"
|
||||
auth_type = "trust"
|
||||
pool_size = 10
|
||||
min_pool_size = 1
|
||||
pool_mode = "transaction"
|
||||
|
||||
[pools.sharded_db.shards.0]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
]
|
||||
database = "shard0"
|
||||
"""
|
||||
utils.pgcat_generic_start(config)
|
||||
|
||||
@classmethod
|
||||
def teardown_method(self):
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
def test_admin_trust_auth(self):
|
||||
conn, cur = utils.connect_db_trust(admin=True)
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
def test_normal_trust_auth(self):
|
||||
conn, cur = utils.connect_db_trust(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
class TestMD5Auth:
|
||||
@classmethod
|
||||
def setup_method(cls):
|
||||
utils.pgcat_start()
|
||||
|
||||
@classmethod
|
||||
def teardown_method(self):
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
def test_normal_db_access(self):
|
||||
conn, cur = utils.connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
def test_admin_db_access(self):
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
@@ -1,84 +1,12 @@
|
||||
from typing import Tuple
|
||||
import psycopg2
|
||||
import psutil
|
||||
import os
|
||||
|
||||
import signal
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
import utils
|
||||
|
||||
SHUTDOWN_TIMEOUT = 5
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_normal_db_access():
|
||||
pgcat_start()
|
||||
conn, cur = connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_admin_db_access():
|
||||
conn, cur = connect_db(admin=True)
|
||||
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_shutdown_logic():
|
||||
|
||||
@@ -86,17 +14,17 @@ def test_shutdown_logic():
|
||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and send query (not in transaction)
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
cur.execute("COMMIT;")
|
||||
|
||||
# Send sigint to pgcat
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||
@@ -108,18 +36,18 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -138,24 +66,24 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||
@@ -165,18 +93,18 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -194,30 +122,30 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
cur.execute("SELECT 1;")
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
time_taken = time.perf_counter() - start
|
||||
if time_taken > 0.1:
|
||||
@@ -227,49 +155,49 @@ def test_shutdown_logic():
|
||||
else:
|
||||
raise Exception("Able connect to database during shutdown")
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
conn, cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
cur.execute("SHOW DATABASES;")
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception(e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
admin_cur.execute("SHOW DATABASES;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
@@ -277,24 +205,24 @@ def test_shutdown_logic():
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception("Could not execute admin command:", e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
|
||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||
@@ -308,12 +236,7 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint and expected timeout")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
|
||||
|
||||
test_normal_db_access()
|
||||
test_admin_db_access()
|
||||
test_shutdown_logic()
|
||||
110
tests/python/utils.py
Normal file
110
tests/python/utils.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
from typing import Tuple
|
||||
import tempfile
|
||||
|
||||
import psutil
|
||||
import psycopg2
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
|
||||
def _pgcat_start(config_path: str):
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system(f"./target/debug/pgcat {config_path} &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pgcat_start():
|
||||
_pgcat_start(config_path='.circleci/pgcat.toml')
|
||||
|
||||
|
||||
def pgcat_generic_start(config: str):
|
||||
tmp = tempfile.NamedTemporaryFile()
|
||||
with open(tmp.name, 'w') as f:
|
||||
f.write(config)
|
||||
_pgcat_start(config_path=tmp.name)
|
||||
|
||||
|
||||
def glauth_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if proc.name() == "glauth":
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep glauth'):
|
||||
raise Exception("glauth not closed after SIGTERM")
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
def connect_db_trust(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
@@ -1,22 +1,33 @@
|
||||
GEM
|
||||
remote: https://rubygems.org/
|
||||
specs:
|
||||
activemodel (7.0.4.1)
|
||||
activesupport (= 7.0.4.1)
|
||||
activerecord (7.0.4.1)
|
||||
activemodel (= 7.0.4.1)
|
||||
activesupport (= 7.0.4.1)
|
||||
activesupport (7.0.4.1)
|
||||
activemodel (7.1.4)
|
||||
activesupport (= 7.1.4)
|
||||
activerecord (7.1.4)
|
||||
activemodel (= 7.1.4)
|
||||
activesupport (= 7.1.4)
|
||||
timeout (>= 0.4.0)
|
||||
activesupport (7.1.4)
|
||||
base64
|
||||
bigdecimal
|
||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||
connection_pool (>= 2.2.5)
|
||||
drb
|
||||
i18n (>= 1.6, < 2)
|
||||
minitest (>= 5.1)
|
||||
mutex_m
|
||||
tzinfo (~> 2.0)
|
||||
ast (2.4.2)
|
||||
concurrent-ruby (1.1.10)
|
||||
base64 (0.2.0)
|
||||
bigdecimal (3.1.8)
|
||||
concurrent-ruby (1.3.4)
|
||||
connection_pool (2.4.1)
|
||||
diff-lcs (1.5.0)
|
||||
i18n (1.12.0)
|
||||
drb (2.2.1)
|
||||
i18n (1.14.5)
|
||||
concurrent-ruby (~> 1.0)
|
||||
minitest (5.17.0)
|
||||
minitest (5.25.1)
|
||||
mutex_m (0.2.0)
|
||||
parallel (1.22.1)
|
||||
parser (3.1.2.0)
|
||||
ast (~> 2.4.1)
|
||||
@@ -24,7 +35,8 @@ GEM
|
||||
pg (1.3.2)
|
||||
rainbow (3.1.1)
|
||||
regexp_parser (2.3.1)
|
||||
rexml (3.2.5)
|
||||
rexml (3.3.6)
|
||||
strscan
|
||||
rspec (3.11.0)
|
||||
rspec-core (~> 3.11.0)
|
||||
rspec-expectations (~> 3.11.0)
|
||||
@@ -50,10 +62,12 @@ GEM
|
||||
rubocop-ast (1.17.0)
|
||||
parser (>= 3.1.1.0)
|
||||
ruby-progressbar (1.11.0)
|
||||
strscan (3.1.0)
|
||||
timeout (0.4.1)
|
||||
toml (0.3.0)
|
||||
parslet (>= 1.8.0, < 3.0.0)
|
||||
toxiproxy (2.0.1)
|
||||
tzinfo (2.0.5)
|
||||
tzinfo (2.0.6)
|
||||
concurrent-ruby (~> 1.0)
|
||||
unicode-display_width (2.1.0)
|
||||
|
||||
|
||||
@@ -91,6 +91,27 @@ describe "Admin" do
|
||||
end
|
||||
end
|
||||
|
||||
[
|
||||
"SHOW ME THE MONEY",
|
||||
"SHOW ME THE WAY",
|
||||
"SHOW UP",
|
||||
"SHOWTIME",
|
||||
"HAMMER TIME",
|
||||
"SHOWN TO BE TRUE",
|
||||
"SHOW ",
|
||||
"SHOW ",
|
||||
"SHOW 1",
|
||||
";;;;;"
|
||||
].each do |cmd|
|
||||
describe "Bad command #{cmd}" do
|
||||
it "does not panic and responds with PG::SystemError" do
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
|
||||
admin_conn.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "PAUSE" do
|
||||
it "pauses all pools" do
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
|
||||
@@ -56,6 +56,41 @@ describe "Random Load Balancing" do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when all replicas are down " do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
|
||||
|
||||
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count = 0
|
||||
number_of_replicas = processes[:replicas].length
|
||||
|
||||
# Take down all replicas
|
||||
processes[:replicas].each(&:take_down)
|
||||
|
||||
(number_of_replicas + 1).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(number_of_replicas + 1)
|
||||
failed_count = 0
|
||||
|
||||
# Ban_time is configured to 60 so this reset will only work
|
||||
# if the replicas are unbanned automatically
|
||||
processes[:replicas].each(&:reset)
|
||||
|
||||
number_of_replicas.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
expect(failed_count).to eq(0)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Least Outstanding Queries Load Balancing" do
|
||||
@@ -161,4 +196,3 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
682
tests/rust/Cargo.lock
generated
682
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,13 +15,11 @@ async fn test_prepared_statements() {
|
||||
for _ in 0..5 {
|
||||
let pool = pool.clone();
|
||||
let handle = tokio::task::spawn(async move {
|
||||
for _ in 0..1000 {
|
||||
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
||||
for i in 0..1000 {
|
||||
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
if err.to_string().contains("prepared statement") {
|
||||
panic!("prepared statement error: {}", err);
|
||||
}
|
||||
panic!("prepared statement error: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user