mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
38 Commits
levkk-bump
...
mostafa_de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
efe8fb3203 | ||
|
|
f3fcf2a76e | ||
|
|
cacfadcf87 | ||
|
|
7a8823df50 | ||
|
|
ca34597002 | ||
|
|
2def40ea6a | ||
|
|
c05129018d | ||
|
|
4a7a6a8e7a | ||
|
|
29a476e190 | ||
|
|
ff2ff51125 | ||
|
|
81933b918d | ||
|
|
7cbc9178d8 | ||
|
|
2c8b2f0776 | ||
|
|
8f9a2b8e6f | ||
|
|
cbf4d58144 | ||
|
|
731aa047ba | ||
|
|
88dbcc21d1 | ||
|
|
c34b15bddc | ||
|
|
0b034a6831 | ||
|
|
966b8e093c | ||
|
|
c9270a47d4 | ||
|
|
0d94d0b90a | ||
|
|
358724f7a9 | ||
|
|
e1e4929d43 | ||
|
|
dc4d6edf17 | ||
|
|
ec3920d60f | ||
|
|
4c5498b915 | ||
|
|
0e8064b049 | ||
|
|
4dbef49ec9 | ||
|
|
bc07dc9c81 | ||
|
|
9b8166b313 | ||
|
|
e58d69f3de | ||
|
|
e76d720ffb | ||
|
|
998cc16a3c | ||
|
|
7c37da2fad | ||
|
|
b45c6b1d23 | ||
|
|
dae240d30c | ||
|
|
b52ea8e7f1 |
@@ -59,6 +59,7 @@ admin_password = "admin_pass"
|
||||
# session: one server connection per connected client
|
||||
# transaction: one server connection per client transaction
|
||||
pool_mode = "transaction"
|
||||
prepared_statements_cache_size = 500
|
||||
|
||||
# If the client doesn't specify, route traffic to
|
||||
# this role by default.
|
||||
@@ -141,6 +142,7 @@ query_parser_enabled = true
|
||||
query_parser_read_write_splitting = true
|
||||
primary_reads_enabled = true
|
||||
sharding_function = "pg_bigint_hash"
|
||||
prepared_statements_cache_size = 500
|
||||
|
||||
[pools.simple_db.users.0]
|
||||
username = "simple_user"
|
||||
|
||||
@@ -108,8 +108,24 @@ cd ../..
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
python3 tests/python/tests.py || exit 1
|
||||
|
||||
|
||||
#
|
||||
# Go tests
|
||||
# Starts its own pgcat server
|
||||
#
|
||||
pushd tests/go
|
||||
/usr/local/go/bin/go test || exit 1
|
||||
popd
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
#
|
||||
# Rust tests
|
||||
#
|
||||
cd tests/rust
|
||||
cargo run
|
||||
cd ../../
|
||||
|
||||
# Admin tests
|
||||
export PGPASSWORD=admin_pass
|
||||
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||
|
||||
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@@ -10,3 +10,7 @@ updates:
|
||||
commit-message:
|
||||
prefix: "chore(deps)"
|
||||
open-pull-requests-limit: 10
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
|
||||
20
.github/workflows/build-and-push.yaml
vendored
20
.github/workflows/build-and-push.yaml
vendored
@@ -2,7 +2,9 @@ name: Build and Push
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
paths:
|
||||
- '!charts/**.md'
|
||||
branches:
|
||||
- main
|
||||
tags:
|
||||
- v*
|
||||
@@ -21,14 +23,17 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout Repository
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Determine tags
|
||||
id: metadata
|
||||
uses: docker/metadata-action@v4
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ env.registry }}/${{ env.image-name }}
|
||||
tags: |
|
||||
@@ -40,15 +45,18 @@ jobs:
|
||||
type=raw,value=latest,enable={{ is_default_branch }}
|
||||
|
||||
- name: Log in to the Container registry
|
||||
uses: docker/login-action@v2.1.0
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.registry }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Build and push ${{ env.image-name }}
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/amd64,linux/arm64
|
||||
provenance: false
|
||||
push: true
|
||||
tags: ${{ steps.metadata.outputs.tags }}
|
||||
labels: ${{ steps.metadata.outputs.labels }}
|
||||
|
||||
50
.github/workflows/chart-lint-test.yaml
vendored
Normal file
50
.github/workflows/chart-lint-test.yaml
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
name: Lint and Test Charts
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
paths:
|
||||
- charts/**
|
||||
- '!charts/**.md'
|
||||
jobs:
|
||||
lint-test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3.1.0
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Helm
|
||||
uses: azure/setup-helm@v3
|
||||
with:
|
||||
version: v3.8.1
|
||||
|
||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4.1.0
|
||||
with:
|
||||
python-version: 3.7
|
||||
|
||||
- name: Set up chart-testing
|
||||
uses: helm/chart-testing-action@v2.2.1
|
||||
with:
|
||||
version: v3.5.1
|
||||
|
||||
- name: Run chart-testing (list-changed)
|
||||
id: list-changed
|
||||
run: |
|
||||
changed=$(ct list-changed --config ct.yaml)
|
||||
if [[ -n "$changed" ]]; then
|
||||
echo "changed=true" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Run chart-testing (lint)
|
||||
run: ct lint --config ct.yaml
|
||||
|
||||
- name: Create kind cluster
|
||||
uses: helm/kind-action@v1.7.0
|
||||
if: steps.list-changed.outputs.changed == 'true'
|
||||
|
||||
- name: Run chart-testing (install)
|
||||
run: ct install --config ct.yaml
|
||||
40
.github/workflows/chart-release.yaml
vendored
Normal file
40
.github/workflows/chart-release.yaml
vendored
Normal file
@@ -0,0 +1,40 @@
|
||||
name: Release Charts
|
||||
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- charts/**
|
||||
- '!**.md'
|
||||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
release:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config user.name "$GITHUB_ACTOR"
|
||||
git config user.email "$GITHUB_ACTOR@users.noreply.github.com"
|
||||
|
||||
- name: Install Helm
|
||||
uses: azure/setup-helm@5119fcb9089d432beecbf79bb2c7915207344b78 # v3.5
|
||||
with:
|
||||
version: v3.13.0
|
||||
|
||||
- name: Run chart-releaser
|
||||
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
||||
with:
|
||||
charts_dir: charts
|
||||
config: cr.yaml
|
||||
env:
|
||||
CR_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
|
||||
48
.github/workflows/generate-chart-readme.yaml
vendored
Normal file
48
.github/workflows/generate-chart-readme.yaml
vendored
Normal file
@@ -0,0 +1,48 @@
|
||||
name: '[CI/CD] Update README metadata'
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- 'charts/*/values.yaml'
|
||||
# Remove all permissions by default
|
||||
permissions: {}
|
||||
jobs:
|
||||
update-readme-metadata:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
- name: Install readme-generator-for-helm
|
||||
run: npm install -g @bitnami/readme-generator-for-helm
|
||||
- name: Checkout
|
||||
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608
|
||||
with:
|
||||
path: charts
|
||||
ref: ${{github.event.pull_request.head.ref}}
|
||||
repository: ${{github.event.pull_request.head.repo.full_name}}
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Execute readme-generator-for-helm
|
||||
env:
|
||||
DIFF_URL: "${{github.event.pull_request.diff_url}}"
|
||||
TEMP_FILE: "${{runner.temp}}/pr-${{github.event.number}}.diff"
|
||||
run: |
|
||||
# This request doesn't consume API calls.
|
||||
curl -Lkso $TEMP_FILE $DIFF_URL
|
||||
files_changed="$(sed -nr 's/[\-\+]{3} [ab]\/(.*)/\1/p' $TEMP_FILE | sort | uniq)"
|
||||
# Adding || true to avoid "Process exited with code 1" errors
|
||||
charts_dirs_changed="$(echo "$files_changed" | xargs dirname | grep -o "pgcat/[^/]*" | sort | uniq || true)"
|
||||
for chart in ${charts_dirs_changed}; do
|
||||
echo "Updating README.md for ${chart}"
|
||||
readme-generator --values "charts/${chart}/values.yaml" --readme "charts/${chart}/README.md" --schema "/tmp/schema.json"
|
||||
done
|
||||
- name: Push changes
|
||||
run: |
|
||||
# Push all the changes
|
||||
cd charts
|
||||
if git status -s | grep pgcat; then
|
||||
git config user.name "$GITHUB_ACTOR"
|
||||
git config user.email "$GITHUB_ACTOR@users.noreply.github.com"
|
||||
git add . && git commit -am "Update README.md with readme-generator-for-helm" --signoff && git push
|
||||
fi
|
||||
@@ -6,6 +6,32 @@ Thank you for contributing! Just a few tips here:
|
||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||
|
||||
## How to run the integration tests locally and iterate on them
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||
|
||||
|
||||
Quite simply, make sure you have docker installed and then run
|
||||
`./start_test_env.sh`
|
||||
|
||||
That is it!
|
||||
|
||||
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
||||
|
||||
Once the environment is ready, you can run the tests by running
|
||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||
Python: `cd /app && python3 tests/python/tests.py`
|
||||
Rust: `cd /app/tests/rust && cargo run`
|
||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||
|
||||
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
||||
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
Happy hacking!
|
||||
|
||||
## TODOs
|
||||
|
||||
104
Cargo.lock
generated
104
Cargo.lock
generated
@@ -146,6 +146,12 @@ dependencies = [
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
||||
|
||||
[[package]]
|
||||
name = "atomic_enum"
|
||||
version = "0.2.0"
|
||||
@@ -542,29 +548,23 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.20"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
|
||||
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
|
||||
dependencies = [
|
||||
"atomic-waker",
|
||||
"bytes",
|
||||
"fnv",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.0"
|
||||
@@ -609,9 +609,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.9"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
|
||||
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -620,12 +620,24 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "0.4.5"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
|
||||
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
@@ -643,13 +655,12 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.27"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
|
||||
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
@@ -658,13 +669,26 @@ dependencies = [
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2 0.4.9",
|
||||
"smallvec",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.57"
|
||||
@@ -709,16 +733,6 @@ dependencies = [
|
||||
"unicode-normalization",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.0.0"
|
||||
@@ -726,7 +740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.14.0",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -848,7 +862,7 @@ version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.0",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1020,7 +1034,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "1.1.2-dev1"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
@@ -1034,7 +1048,9 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"hmac",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"itertools",
|
||||
"jemallocator",
|
||||
"log",
|
||||
@@ -1478,9 +1494,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.11.0"
|
||||
version = "1.13.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
@@ -1510,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.34.0"
|
||||
version = "0.41.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
|
||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||
dependencies = [
|
||||
"log",
|
||||
"sqlparser_derive",
|
||||
@@ -1520,13 +1536,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser_derive"
|
||||
version = "0.1.1"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
|
||||
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1741,19 +1757,13 @@ version = "0.19.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
|
||||
dependencies = [
|
||||
"indexmap 2.0.0",
|
||||
"indexmap",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.37"
|
||||
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "1.1.2-dev1"
|
||||
version = "1.2.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = {version = "0.34", features = ["visitor"] }
|
||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
parking_lot = "0.12.1"
|
||||
@@ -29,7 +29,9 @@ base64 = "0.21"
|
||||
stringprep = "0.1"
|
||||
tokio-rustls = "0.24"
|
||||
rustls-pemfile = "1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
http-body-util = "0.1.2"
|
||||
hyper = { version = "1.4.1", features = ["full"] }
|
||||
hyper-util = { version = "0.1.7", features = ["tokio"] }
|
||||
phf = { version = "0.11.1", features = ["macros"] }
|
||||
exitcode = "1.1.2"
|
||||
futures = "0.3"
|
||||
@@ -47,9 +49,12 @@ serde_json = "1"
|
||||
itertools = "0.10"
|
||||
clap = { version = "4.3.1", features = ["derive", "env"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
|
||||
tracing-subscriber = { version = "0.3.17", features = [
|
||||
"json",
|
||||
"env-filter",
|
||||
"std",
|
||||
] }
|
||||
lru = "0.12.0"
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.5.0"
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1-slim-bookworm AS builder
|
||||
FROM rust:1.79.0-slim-bookworm AS builder
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
@@ -19,3 +19,4 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
|
||||
WORKDIR /etc/pgcat
|
||||
ENV RUST_LOG=info
|
||||
CMD ["pgcat"]
|
||||
STOPSIGNAL SIGINT
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM cimg/rust:1.67.1
|
||||
FROM cimg/rust:1.79.0
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
RUN sudo apt-get update && \
|
||||
@@ -9,6 +9,9 @@ RUN sudo apt-get update && \
|
||||
sudo apt-get upgrade curl && \
|
||||
cargo install cargo-binutils rustfilt && \
|
||||
rustup component add llvm-tools-preview && \
|
||||
pip3 install psycopg2 && sudo gem install bundler && \
|
||||
pip3 install psycopg2 && sudo gem install bundler && \
|
||||
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
||||
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
|
||||
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz
|
||||
|
||||
@@ -40,7 +40,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
|
||||
<img src="./images/postgresml.webp" height="70" width="auto">
|
||||
</a>
|
||||
</td>
|
||||
@@ -57,7 +57,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
|
||||
PostgresML
|
||||
</a>
|
||||
</td>
|
||||
@@ -268,6 +268,8 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
|
||||
|
||||
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
||||
|
||||
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
|
||||
|
||||
### Live configuration reloading
|
||||
|
||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
||||
|
||||
23
charts/pgcat/.helmignore
Normal file
23
charts/pgcat/.helmignore
Normal file
@@ -0,0 +1,23 @@
|
||||
# Patterns to ignore when building packages.
|
||||
# This supports shell glob matching, relative path matching, and
|
||||
# negation (prefixed with !). Only one pattern per line.
|
||||
.DS_Store
|
||||
# Common VCS dirs
|
||||
.git/
|
||||
.gitignore
|
||||
.bzr/
|
||||
.bzrignore
|
||||
.hg/
|
||||
.hgignore
|
||||
.svn/
|
||||
# Common backup files
|
||||
*.swp
|
||||
*.bak
|
||||
*.tmp
|
||||
*.orig
|
||||
*~
|
||||
# Various IDEs
|
||||
.project
|
||||
.idea/
|
||||
*.tmproj
|
||||
.vscode/
|
||||
8
charts/pgcat/Chart.yaml
Normal file
8
charts/pgcat/Chart.yaml
Normal file
@@ -0,0 +1,8 @@
|
||||
apiVersion: v2
|
||||
name: pgcat
|
||||
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
maintainers:
|
||||
- name: Wildcard
|
||||
email: support@w6d.io
|
||||
appVersion: "1.2.0"
|
||||
version: 0.2.0
|
||||
22
charts/pgcat/templates/NOTES.txt
Normal file
22
charts/pgcat/templates/NOTES.txt
Normal file
@@ -0,0 +1,22 @@
|
||||
1. Get the application URL by running these commands:
|
||||
{{- if .Values.ingress.enabled }}
|
||||
{{- range $host := .Values.ingress.hosts }}
|
||||
{{- range .paths }}
|
||||
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- else if contains "NodePort" .Values.service.type }}
|
||||
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "pgcat.fullname" . }})
|
||||
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
|
||||
echo http://$NODE_IP:$NODE_PORT
|
||||
{{- else if contains "LoadBalancer" .Values.service.type }}
|
||||
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
|
||||
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "pgcat.fullname" . }}'
|
||||
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "pgcat.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
|
||||
echo http://$SERVICE_IP:{{ .Values.service.port }}
|
||||
{{- else if contains "ClusterIP" .Values.service.type }}
|
||||
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "pgcat.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
|
||||
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
|
||||
echo "Visit http://127.0.0.1:8080 to use your application"
|
||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
|
||||
{{- end }}
|
||||
3
charts/pgcat/templates/_config.tpl
Normal file
3
charts/pgcat/templates/_config.tpl
Normal file
@@ -0,0 +1,3 @@
|
||||
{{/*
|
||||
Configuration template definition
|
||||
*/}}
|
||||
62
charts/pgcat/templates/_helpers.tpl
Normal file
62
charts/pgcat/templates/_helpers.tpl
Normal file
@@ -0,0 +1,62 @@
|
||||
{{/*
|
||||
Expand the name of the chart.
|
||||
*/}}
|
||||
{{- define "pgcat.name" -}}
|
||||
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create a default fully qualified app name.
|
||||
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
|
||||
If release name contains chart name it will be used as a full name.
|
||||
*/}}
|
||||
{{- define "pgcat.fullname" -}}
|
||||
{{- if .Values.fullnameOverride }}
|
||||
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- $name := default .Chart.Name .Values.nameOverride }}
|
||||
{{- if contains $name .Release.Name }}
|
||||
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create chart name and version as used by the chart label.
|
||||
*/}}
|
||||
{{- define "pgcat.chart" -}}
|
||||
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Common labels
|
||||
*/}}
|
||||
{{- define "pgcat.labels" -}}
|
||||
helm.sh/chart: {{ include "pgcat.chart" . }}
|
||||
{{ include "pgcat.selectorLabels" . }}
|
||||
{{- if .Chart.AppVersion }}
|
||||
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
||||
{{- end }}
|
||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Selector labels
|
||||
*/}}
|
||||
{{- define "pgcat.selectorLabels" -}}
|
||||
app.kubernetes.io/name: {{ include "pgcat.name" . }}
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create the name of the service account to use
|
||||
*/}}
|
||||
{{- define "pgcat.serviceAccountName" -}}
|
||||
{{- if .Values.serviceAccount.create }}
|
||||
{{- default (include "pgcat.fullname" .) .Values.serviceAccount.name }}
|
||||
{{- else }}
|
||||
{{- default "default" .Values.serviceAccount.name }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
66
charts/pgcat/templates/deployment.yaml
Normal file
66
charts/pgcat/templates/deployment.yaml
Normal file
@@ -0,0 +1,66 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ include "pgcat.fullname" . }}
|
||||
labels:
|
||||
{{- include "pgcat.labels" . | nindent 4 }}
|
||||
spec:
|
||||
replicas: {{ .Values.replicaCount }}
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "pgcat.selectorLabels" . | nindent 6 }}
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
checksum/secret: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }}
|
||||
{{- with .Values.podAnnotations }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
labels:
|
||||
{{- include "pgcat.selectorLabels" . | nindent 8 }}
|
||||
spec:
|
||||
{{- with .Values.image.pullSecrets }}
|
||||
imagePullSecrets:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
serviceAccountName: {{ include "pgcat.serviceAccountName" . }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
||||
containers:
|
||||
- name: {{ .Chart.Name }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.containerSecurityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: pgcat
|
||||
containerPort: {{ .Values.configuration.general.port }}
|
||||
protocol: TCP
|
||||
livenessProbe:
|
||||
tcpSocket:
|
||||
port: pgcat
|
||||
readinessProbe:
|
||||
tcpSocket:
|
||||
port: pgcat
|
||||
resources:
|
||||
{{- toYaml .Values.resources | nindent 12 }}
|
||||
volumeMounts:
|
||||
- mountPath: /etc/pgcat
|
||||
name: config
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
volumes:
|
||||
- secret:
|
||||
defaultMode: 420
|
||||
secretName: {{ include "pgcat.fullname" . }}
|
||||
name: config
|
||||
61
charts/pgcat/templates/ingress.yaml
Normal file
61
charts/pgcat/templates/ingress.yaml
Normal file
@@ -0,0 +1,61 @@
|
||||
{{- if .Values.ingress.enabled -}}
|
||||
{{- $fullName := include "pgcat.fullname" . -}}
|
||||
{{- $svcPort := .Values.service.port -}}
|
||||
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
|
||||
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
|
||||
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
|
||||
apiVersion: networking.k8s.io/v1
|
||||
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
|
||||
apiVersion: networking.k8s.io/v1beta1
|
||||
{{- else -}}
|
||||
apiVersion: extensions/v1beta1
|
||||
{{- end }}
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: {{ $fullName }}
|
||||
labels:
|
||||
{{- include "pgcat.labels" . | nindent 4 }}
|
||||
{{- with .Values.ingress.annotations }}
|
||||
annotations:
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
spec:
|
||||
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
|
||||
ingressClassName: {{ .Values.ingress.className }}
|
||||
{{- end }}
|
||||
{{- if .Values.ingress.tls }}
|
||||
tls:
|
||||
{{- range .Values.ingress.tls }}
|
||||
- hosts:
|
||||
{{- range .hosts }}
|
||||
- {{ . | quote }}
|
||||
{{- end }}
|
||||
secretName: {{ .secretName }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
rules:
|
||||
{{- range .Values.ingress.hosts }}
|
||||
- host: {{ .host | quote }}
|
||||
http:
|
||||
paths:
|
||||
{{- range .paths }}
|
||||
- path: {{ .path }}
|
||||
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
|
||||
pathType: {{ .pathType }}
|
||||
{{- end }}
|
||||
backend:
|
||||
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
|
||||
service:
|
||||
name: {{ $fullName }}
|
||||
port:
|
||||
number: {{ $svcPort }}
|
||||
{{- else }}
|
||||
serviceName: {{ $fullName }}
|
||||
servicePort: {{ $svcPort }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
86
charts/pgcat/templates/secret.yaml
Normal file
86
charts/pgcat/templates/secret.yaml
Normal file
@@ -0,0 +1,86 @@
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: {{ include "pgcat.fullname" . }}
|
||||
labels:
|
||||
{{- include "pgcat.labels" . | nindent 4 }}
|
||||
type: Opaque
|
||||
stringData:
|
||||
pgcat.toml: |
|
||||
[general]
|
||||
host = {{ .Values.configuration.general.host | quote }}
|
||||
port = {{ .Values.configuration.general.port }}
|
||||
enable_prometheus_exporter = {{ .Values.configuration.general.enable_prometheus_exporter }}
|
||||
prometheus_exporter_port = {{ .Values.configuration.general.prometheus_exporter_port }}
|
||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||
shutdown_timeout = {{ .Values.configuration.general.shutdown_timeout }}
|
||||
ban_time = {{ .Values.configuration.general.ban_time }}
|
||||
log_client_connections = {{ .Values.configuration.general.log_client_connections }}
|
||||
log_client_disconnections = {{ .Values.configuration.general.log_client_disconnections }}
|
||||
tcp_keepalives_idle = {{ .Values.configuration.general.tcp_keepalives_idle }}
|
||||
tcp_keepalives_count = {{ .Values.configuration.general.tcp_keepalives_count }}
|
||||
tcp_keepalives_interval = {{ .Values.configuration.general.tcp_keepalives_interval }}
|
||||
{{- if and (ne .Values.configuration.general.tls_certificate "-") (ne .Values.configuration.general.tls_private_key "-") }}
|
||||
tls_certificate = "{{ .Values.configuration.general.tls_certificate }}"
|
||||
tls_private_key = "{{ .Values.configuration.general.tls_private_key }}"
|
||||
{{- end }}
|
||||
admin_username = {{ .Values.configuration.general.admin_username | quote }}
|
||||
admin_password = {{ .Values.configuration.general.admin_password | quote }}
|
||||
{{- if and .Values.configuration.general.auth_query_user .Values.configuration.general.auth_query_password .Values.configuration.general.auth_query }}
|
||||
auth_query = {{ .Values.configuration.general.auth_query | quote }}
|
||||
auth_query_user = {{ .Values.configuration.general.auth_query_user | quote }}
|
||||
auth_query_password = {{ .Values.configuration.general.auth_query_password | quote }}
|
||||
{{- end }}
|
||||
|
||||
{{- range $pool := .Values.configuration.pools }}
|
||||
|
||||
##
|
||||
## pool for {{ $pool.name }}
|
||||
##
|
||||
[pools.{{ $pool.name | quote }}]
|
||||
pool_mode = {{ default "transaction" $pool.pool_mode | quote }}
|
||||
load_balancing_mode = {{ default "random" $pool.load_balancing_mode | quote }}
|
||||
default_role = {{ default "any" $pool.default_role | quote }}
|
||||
prepared_statements_cache_size = {{ default 500 $pool.prepared_statements_cache_size }}
|
||||
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
|
||||
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
|
||||
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
|
||||
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
|
||||
|
||||
{{- range $index, $user := $pool.users }}
|
||||
|
||||
## pool {{ $pool.name }} user {{ $user.username | quote }}
|
||||
##
|
||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||
username = {{ $user.username | quote }}
|
||||
password = {{ $user.password | quote }}
|
||||
pool_size = {{ $user.pool_size }}
|
||||
statement_timeout = {{ $user.statement_timeout }}
|
||||
min_pool_size = 3
|
||||
server_lifetime = 60000
|
||||
{{- if and $user.server_username $user.server_password }}
|
||||
server_username = {{ $user.server_username | quote }}
|
||||
server_password = {{ $user.server_password | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{- range $index, $shard := $pool.shards }}
|
||||
|
||||
## pool {{ $pool.name }} database {{ $shard.database }}
|
||||
##
|
||||
[pools.{{ $pool.name | quote }}.shards.{{ $index }}]
|
||||
{{- if gt (len $shard.servers) 0}}
|
||||
servers = [
|
||||
{{- range $server := $shard.servers }}
|
||||
[ {{ $server.host | quote }}, {{ $server.port }}, {{ $server.role | quote }} ],
|
||||
{{- end }}
|
||||
]
|
||||
{{- end }}
|
||||
database = {{ $shard.database | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
15
charts/pgcat/templates/service.yaml
Normal file
15
charts/pgcat/templates/service.yaml
Normal file
@@ -0,0 +1,15 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ include "pgcat.fullname" . }}
|
||||
labels:
|
||||
{{- include "pgcat.labels" . | nindent 4 }}
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: pgcat
|
||||
protocol: TCP
|
||||
name: pgcat
|
||||
selector:
|
||||
{{- include "pgcat.selectorLabels" . | nindent 4 }}
|
||||
12
charts/pgcat/templates/serviceaccount.yaml
Normal file
12
charts/pgcat/templates/serviceaccount.yaml
Normal file
@@ -0,0 +1,12 @@
|
||||
{{- if .Values.serviceAccount.create -}}
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: {{ include "pgcat.serviceAccountName" . }}
|
||||
labels:
|
||||
{{- include "pgcat.labels" . | nindent 4 }}
|
||||
{{- with .Values.serviceAccount.annotations }}
|
||||
annotations:
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
369
charts/pgcat/values.yaml
Normal file
369
charts/pgcat/values.yaml
Normal file
@@ -0,0 +1,369 @@
|
||||
## String to partially override aspnet-core.fullname template (will maintain the release name)
|
||||
## @param nameOverride String to partially override common.names.fullname
|
||||
##
|
||||
nameOverride: ""
|
||||
|
||||
## String to fully override aspnet-core.fullname template
|
||||
## @param fullnameOverride String to fully override common.names.fullname
|
||||
##
|
||||
fullnameOverride: ""
|
||||
|
||||
## Number of PgCat replicas to deploy
|
||||
## @param replicaCount Number of PgCat replicas to deploy
|
||||
replicaCount: 1
|
||||
|
||||
## Bitnami PgCat image version
|
||||
## ref: https://hub.docker.com/r/bitnami/kubewatch/tags/
|
||||
##
|
||||
## @param image.registry PgCat image registry
|
||||
## @param image.repository PgCat image name
|
||||
## @param image.tag PgCat image tag
|
||||
## @param image.pullPolicy PgCat image tag
|
||||
## @param image.pullSecrets Specify docker-registry secret names as an array
|
||||
image:
|
||||
repository: ghcr.io/postgresml/pgcat
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: "main"
|
||||
## Specify a imagePullPolicy
|
||||
## Defaults to 'Always' if image tag is 'latest', else set to 'IfNotPresent'
|
||||
## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images
|
||||
##
|
||||
pullPolicy: IfNotPresent
|
||||
## Optionally specify an array of imagePullSecrets.
|
||||
## Secrets must be manually created in the namespace.
|
||||
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
|
||||
## Example:
|
||||
## pullSecrets:
|
||||
## - myRegistryKeySecretName
|
||||
##
|
||||
pullSecrets: []
|
||||
|
||||
## Specifies whether a ServiceAccount should be created
|
||||
##
|
||||
## @param serviceAccount.create Enable the creation of a ServiceAccount for PgCat pods
|
||||
## @param serviceAccount.name Name of the created ServiceAccount
|
||||
##
|
||||
serviceAccount:
|
||||
## Specifies whether a service account should be created
|
||||
create: true
|
||||
## Annotations to add to the service account
|
||||
annotations: {}
|
||||
## The name of the service account to use.
|
||||
## If not set and create is true, a name is generated using the fullname template
|
||||
name: ""
|
||||
|
||||
## Annotations for server pods.
|
||||
## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
|
||||
##
|
||||
## @param podAnnotations Annotations for PgCat pods
|
||||
##
|
||||
podAnnotations: {}
|
||||
|
||||
## PgCat containers' SecurityContext
|
||||
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod
|
||||
##
|
||||
## @param podSecurityContext.enabled Enabled PgCat pods' Security Context
|
||||
## @param podSecurityContext.fsGroup Set PgCat pod's Security Context fsGroup
|
||||
##
|
||||
podSecurityContext: {}
|
||||
# fsGroup: 2000
|
||||
|
||||
## PgCat pods' Security Context
|
||||
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container
|
||||
##
|
||||
## @param containerSecurityContext.enabled Enabled PgCat containers' Security Context
|
||||
## @param containerSecurityContext.runAsUser Set PgCat container's Security Context runAsUser
|
||||
## @param containerSecurityContext.runAsNonRoot Set PgCat container's Security Context runAsNonRoot
|
||||
##
|
||||
containerSecurityContext: {}
|
||||
# capabilities:
|
||||
# drop:
|
||||
# - ALL
|
||||
# readOnlyRootFilesystem: true
|
||||
# runAsNonRoot: true
|
||||
# runAsUser: 1000
|
||||
|
||||
## PgCat service
|
||||
##
|
||||
## @param service.type PgCat service type
|
||||
## @param service.port PgCat service port
|
||||
service:
|
||||
type: ClusterIP
|
||||
port: 6432
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
className: ""
|
||||
annotations: {}
|
||||
# kubernetes.io/ingress.class: nginx
|
||||
# kubernetes.io/tls-acme: "true"
|
||||
hosts:
|
||||
- host: chart-example.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
tls: []
|
||||
# - secretName: chart-example-tls
|
||||
# hosts:
|
||||
# - chart-example.local
|
||||
|
||||
## PgCat resource requests and limits
|
||||
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
|
||||
##
|
||||
## @skip resources Optional description
|
||||
## @disabled-param resources.limits The resources limits for the PgCat container
|
||||
## @disabled-param resources.requests The requested resources for the PgCat container
|
||||
##
|
||||
resources:
|
||||
# We usually recommend not to specify default resources and to leave this as a conscious
|
||||
# choice for the user. This also increases chances charts run on environments with little
|
||||
# resources, such as Minikube. If you do want to specify resources, uncomment the following
|
||||
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
|
||||
limits: {}
|
||||
# cpu: 100m
|
||||
# memory: 128Mi
|
||||
requests: {}
|
||||
# cpu: 100m
|
||||
# memory: 128Mi
|
||||
|
||||
## Node labels for pod assignment. Evaluated as a template.
|
||||
## ref: https://kubernetes.io/docs/user-guide/node-selection/
|
||||
##
|
||||
## @param nodeSelector Node labels for pod assignment
|
||||
##
|
||||
nodeSelector: {}
|
||||
|
||||
## Tolerations for pod assignment. Evaluated as a template.
|
||||
## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
|
||||
##
|
||||
## @param tolerations Tolerations for pod assignment
|
||||
##
|
||||
tolerations: []
|
||||
|
||||
## Affinity for pod assignment. Evaluated as a template.
|
||||
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
|
||||
## Note: podAffinityPreset, podAntiAffinityPreset, and nodeAffinityPreset will be ignored when it's set
|
||||
##
|
||||
## @param affinity Affinity for pod assignment
|
||||
##
|
||||
affinity: {}
|
||||
|
||||
## PgCat configuration
|
||||
## @param configuration [object]
|
||||
configuration:
|
||||
## General pooler settings
|
||||
## @param [object]
|
||||
general:
|
||||
## @param configuration.general.host What IP to run on, 0.0.0.0 means accessible from everywhere.
|
||||
host: "0.0.0.0"
|
||||
|
||||
## @param configuration.general.port Port to run on, same as PgBouncer used in this example.
|
||||
port: 6432
|
||||
|
||||
## @param configuration.general.enable_prometheus_exporter Whether to enable prometheus exporter or not.
|
||||
enable_prometheus_exporter: false
|
||||
|
||||
## @param configuration.general.prometheus_exporter_port Port at which prometheus exporter listens on.
|
||||
prometheus_exporter_port: 9930
|
||||
|
||||
# @param configuration.general.connect_timeout How long to wait before aborting a server connection (ms).
|
||||
connect_timeout: 5000
|
||||
|
||||
# How long an idle connection with a server is left open (ms).
|
||||
idle_timeout: 30000 # milliseconds
|
||||
|
||||
# Max connection lifetime before it's closed, even if actively used.
|
||||
server_lifetime: 86400000 # 24 hours
|
||||
|
||||
# How long a client is allowed to be idle while in a transaction (ms).
|
||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||
|
||||
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
|
||||
healthcheck_timeout: 1000
|
||||
|
||||
# @param configuration.general.healthcheck_delay How long to keep connection available for immediate re-use, without running a healthcheck query on it
|
||||
healthcheck_delay: 30000
|
||||
|
||||
# @param configuration.general.shutdown_timeout How much time to give clients during shutdown before forcibly killing client connections (ms).
|
||||
shutdown_timeout: 60000
|
||||
|
||||
# @param configuration.general.ban_time For how long to ban a server if it fails a health check (seconds).
|
||||
ban_time: 60 # seconds
|
||||
|
||||
# @param configuration.general.log_client_connections If we should log client connections
|
||||
log_client_connections: false
|
||||
|
||||
# @param configuration.general.log_client_disconnections If we should log client disconnections
|
||||
log_client_disconnections: false
|
||||
|
||||
# TLS
|
||||
# tls_certificate: "server.cert"
|
||||
# tls_private_key: "server.key"
|
||||
tls_certificate: "-"
|
||||
tls_private_key: "-"
|
||||
|
||||
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
|
||||
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
|
||||
admin_username: "postgres"
|
||||
admin_password: "postgres"
|
||||
|
||||
# Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
|
||||
# established using the database configured in the pool. This parameter is inherited by every pool and
|
||||
# can be redefined in pool configuration.
|
||||
auth_query: null
|
||||
|
||||
# User to be used for connecting to servers to obtain the hash used for md5 authentication by sending
|
||||
# the query specified in auth_query_user. The connection will be established using the database configured
|
||||
# in the pool. This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
#
|
||||
# @param configuration.general.auth_query_user
|
||||
auth_query_user: null
|
||||
|
||||
# Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending
|
||||
# the query specified in auth_query_user. The connection will be established using the database configured
|
||||
# in the pool. This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
#
|
||||
# @param configuration.general.auth_query_password
|
||||
auth_query_password: null
|
||||
|
||||
# Number of seconds of connection idleness to wait before sending a keepalive packet to the server.
|
||||
tcp_keepalives_idle: 5
|
||||
|
||||
# Number of unacknowledged keepalive packets allowed before giving up and closing the connection.
|
||||
tcp_keepalives_count: 5
|
||||
|
||||
# Number of seconds between keepalive packets.
|
||||
tcp_keepalives_interval: 5
|
||||
|
||||
## pool
|
||||
## configs are structured as pool.<pool_name>
|
||||
## the pool_name is what clients use as database name when connecting
|
||||
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
|
||||
## @param [object]
|
||||
pools:
|
||||
[{
|
||||
name: "simple", pool_mode: "transaction",
|
||||
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
|
||||
shards: [{
|
||||
servers: [{host: "postgres", port: 5432, role: "primary"}],
|
||||
database: "postgres"
|
||||
}]
|
||||
}]
|
||||
# - ## default values
|
||||
# ##
|
||||
# ##
|
||||
# ##
|
||||
# name: "db"
|
||||
|
||||
# ## Pool mode (see PgBouncer docs for more).
|
||||
# ## session: one server connection per connected client
|
||||
# ## transaction: one server connection per client transaction
|
||||
# ## @param configuration.poolsPostgres.pool_mode
|
||||
# pool_mode: "transaction"
|
||||
|
||||
# ## Load balancing mode
|
||||
# ## `random` selects the server at random
|
||||
# ## `loc` selects the server with the least outstanding busy connections
|
||||
# ##
|
||||
# ## @param configuration.poolsPostgres.load_balancing_mode
|
||||
# load_balancing_mode: "random"
|
||||
|
||||
# ## Prepared statements cache size.
|
||||
# ## TODO: update documentation
|
||||
# ##
|
||||
# ## @param configuration.poolsPostgres.prepared_statements_cache_size
|
||||
# prepared_statements_cache_size: 500
|
||||
|
||||
# ## If the client doesn't specify, route traffic to
|
||||
# ## this role by default.
|
||||
# ##
|
||||
# ## any: round-robin between primary and replicas,
|
||||
# ## replica: round-robin between replicas only without touching the primary,
|
||||
# ## primary: all queries go to the primary unless otherwise specified.
|
||||
# ## @param configuration.poolsPostgres.default_role
|
||||
# default_role: "any"
|
||||
|
||||
# ## Query parser. If enabled, we'll attempt to parse
|
||||
# ## every incoming query to determine if it's a read or a write.
|
||||
# ## If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
|
||||
# ## we'll direct it to the primary.
|
||||
# ## @param configuration.poolsPostgres.query_parser_enabled
|
||||
# query_parser_enabled: true
|
||||
|
||||
# ## If the query parser is enabled and this setting is enabled, we'll attempt to
|
||||
# ## infer the role from the query itself.
|
||||
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
|
||||
# query_parser_read_write_splitting: true
|
||||
|
||||
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# ## load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# ## queries. The primary can always be explicitly selected with our custom protocol.
|
||||
# ## @param configuration.poolsPostgres.primary_reads_enabled
|
||||
# primary_reads_enabled: true
|
||||
|
||||
# ## So what if you wanted to implement a different hashing function,
|
||||
# ## or you've already built one and you want this pooler to use it?
|
||||
# ##
|
||||
# ## Current options:
|
||||
# ##
|
||||
# ## pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
|
||||
# ## sha1: A hashing function based on SHA1
|
||||
# ##
|
||||
# ## @param configuration.poolsPostgres.sharding_function
|
||||
# sharding_function: "pg_bigint_hash"
|
||||
|
||||
# ## Credentials for users that may connect to this cluster
|
||||
# ## @param users [array]
|
||||
# ## @param users[0].username Name of the env var (required)
|
||||
# ## @param users[0].password Value for the env var (required)
|
||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
# users: []
|
||||
# # - username: "user"
|
||||
# # password: "pass"
|
||||
# #
|
||||
# # # The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
# # # is the sum of pool_size across all users.
|
||||
# # pool_size: 9
|
||||
# #
|
||||
# # # Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
# # statement_timeout: 0
|
||||
# #
|
||||
# # # PostgreSQL username used to connect to the server.
|
||||
# # server_username: "postgres
|
||||
# #
|
||||
# # # PostgreSQL password used to connect to the server.
|
||||
# # server_password: "postgres
|
||||
|
||||
# ## @param shards [array]
|
||||
# ## @param shards[0].server[0].host Host for this shard
|
||||
# ## @param shards[0].server[0].port Port for this shard
|
||||
# ## @param shards[0].server[0].role Role for this shard
|
||||
# shards: []
|
||||
# # [ host, port, role ]
|
||||
# # - servers:
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "primary"
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "replica"
|
||||
# # database: "postgres"
|
||||
# # # [ host, port, role ]
|
||||
# # - servers:
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "primary"
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "replica"
|
||||
# # database: "postgres"
|
||||
# # # [ host, port, role ]
|
||||
# # - servers:
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "primary"
|
||||
# # - host: "postgres"
|
||||
# # port: 5432
|
||||
# # role: "replica"
|
||||
# # database: "postgres"
|
||||
5
ct.yaml
Normal file
5
ct.yaml
Normal file
@@ -0,0 +1,5 @@
|
||||
remote: origin
|
||||
target-branch: main
|
||||
chart-dirs:
|
||||
- charts
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.70-bullseye
|
||||
FROM rust:bullseye
|
||||
|
||||
# Dependencies
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
|
||||
2124
grafana_dashboard.json
Normal file
2124
grafana_dashboard.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -11,6 +11,7 @@ RestartSec=1
|
||||
Environment=RUST_LOG=info
|
||||
LimitNOFILE=65536
|
||||
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
||||
ExecReload=/bin/kill -SIGHUP $MAINPID
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
@@ -301,6 +301,8 @@ username = "other_user"
|
||||
password = "other_user"
|
||||
pool_size = 21
|
||||
statement_timeout = 15000
|
||||
connect_timeout = 1000
|
||||
idle_timeout = 1000
|
||||
|
||||
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
|
||||
# Each shard config contains a list of servers that make up the shard
|
||||
|
||||
19
src/admin.rs
19
src/admin.rs
@@ -55,7 +55,12 @@ where
|
||||
|
||||
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
||||
|
||||
match query_parts[0].to_ascii_uppercase().as_str() {
|
||||
match query_parts
|
||||
.first()
|
||||
.unwrap_or(&"")
|
||||
.to_ascii_uppercase()
|
||||
.as_str()
|
||||
{
|
||||
"BAN" => {
|
||||
trace!("BAN");
|
||||
ban(stream, query_parts).await
|
||||
@@ -84,7 +89,12 @@ where
|
||||
trace!("SHUTDOWN");
|
||||
shutdown(stream).await
|
||||
}
|
||||
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
|
||||
"SHOW" => match query_parts
|
||||
.get(1)
|
||||
.unwrap_or(&"")
|
||||
.to_ascii_uppercase()
|
||||
.as_str()
|
||||
{
|
||||
"HELP" => {
|
||||
trace!("SHOW HELP");
|
||||
show_help(stream).await
|
||||
@@ -690,6 +700,8 @@ where
|
||||
("query_count", DataType::Numeric),
|
||||
("error_count", DataType::Numeric),
|
||||
("age_seconds", DataType::Numeric),
|
||||
("maxwait", DataType::Numeric),
|
||||
("maxwait_us", DataType::Numeric),
|
||||
];
|
||||
|
||||
let new_map = get_client_stats();
|
||||
@@ -697,6 +709,7 @@ where
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for (_, client) in new_map {
|
||||
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
||||
let row = vec![
|
||||
format!("{:#010X}", client.client_id()),
|
||||
client.pool_name(),
|
||||
@@ -710,6 +723,8 @@ where
|
||||
.duration_since(client.connect_time())
|
||||
.as_secs()
|
||||
.to_string(),
|
||||
(max_wait / 1_000_000).to_string(),
|
||||
(max_wait % 1_000_000).to_string(),
|
||||
];
|
||||
|
||||
res.put(data_row(&row));
|
||||
|
||||
@@ -79,6 +79,8 @@ impl AuthPassthrough {
|
||||
pool_mode: None,
|
||||
server_lifetime: None,
|
||||
min_pool_size: None,
|
||||
connect_timeout: None,
|
||||
idle_timeout: None,
|
||||
};
|
||||
|
||||
let user = &address.username;
|
||||
|
||||
@@ -1149,7 +1149,7 @@ where
|
||||
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
||||
let code = *message.first().unwrap() as char;
|
||||
|
||||
trace!("Message: {}", code);
|
||||
trace!("Client message: {}", code);
|
||||
|
||||
match code {
|
||||
// Query
|
||||
@@ -1188,6 +1188,7 @@ where
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.send_and_receive_loop(
|
||||
@@ -1320,6 +1321,7 @@ where
|
||||
{
|
||||
match protocol_data {
|
||||
ExtendedProtocolData::Parse { data, metadata } => {
|
||||
debug!("Have parse in extended buffer");
|
||||
let (parse, hash) = match metadata {
|
||||
Some(metadata) => metadata,
|
||||
None => {
|
||||
@@ -1435,7 +1437,7 @@ where
|
||||
.await
|
||||
{
|
||||
// We might be in some kind of error/in between protocol state
|
||||
server.mark_bad();
|
||||
server.mark_bad(err.to_string().as_str());
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
@@ -1502,7 +1504,7 @@ where
|
||||
match write_all_flush(&mut self.write, &response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
server.mark_bad(err.to_string().as_str());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
@@ -1656,11 +1658,25 @@ where
|
||||
) -> Result<(), Error> {
|
||||
match self.prepared_statements.get(&client_name) {
|
||||
Some((parse, hash)) => {
|
||||
debug!("Prepared statement `{}` found in cache", parse.name);
|
||||
debug!("Prepared statement `{}` found in cache", client_name);
|
||||
// In this case we want to send the parse message to the server
|
||||
// since pgcat is initiating the prepared statement on this specific server
|
||||
self.register_parse_to_server_cache(true, hash, parse, pool, server, address)
|
||||
.await?;
|
||||
match self
|
||||
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
|
||||
.await
|
||||
{
|
||||
Ok(_) => (),
|
||||
Err(err) => match err {
|
||||
Error::PreparedStatementError => {
|
||||
debug!("Removed {} from client cache", client_name);
|
||||
self.prepared_statements.remove(&client_name);
|
||||
}
|
||||
|
||||
_ => {
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
@@ -1689,11 +1705,20 @@ where
|
||||
// We want to promote this in the pool's LRU
|
||||
pool.promote_prepared_statement_hash(hash);
|
||||
|
||||
debug!("Checking for prepared statement {}", parse.name);
|
||||
|
||||
if let Err(err) = server
|
||||
.register_prepared_statement(parse, should_send_parse_to_server)
|
||||
.await
|
||||
{
|
||||
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
||||
match err {
|
||||
// Don't ban for this.
|
||||
Error::PreparedStatementError => (),
|
||||
_ => {
|
||||
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
||||
}
|
||||
};
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
@@ -1704,18 +1729,14 @@ where
|
||||
/// and also the pool's statement cache. Add it to extended protocol data.
|
||||
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
// Avoid parsing if prepared statements not enabled
|
||||
let client_given_name = match self.prepared_statements_enabled {
|
||||
true => Parse::get_name(&message)?,
|
||||
false => "".to_string(),
|
||||
};
|
||||
|
||||
if client_given_name.is_empty() {
|
||||
if !self.prepared_statements_enabled {
|
||||
debug!("Anonymous parse message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = Parse::get_name(&message)?;
|
||||
let parse: Parse = (&message).try_into()?;
|
||||
|
||||
// Compute the hash of the parse statement
|
||||
@@ -1753,18 +1774,15 @@ where
|
||||
/// saved in the client cache.
|
||||
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||
// Avoid parsing if prepared statements not enabled
|
||||
let client_given_name = match self.prepared_statements_enabled {
|
||||
true => Bind::get_name(&message)?,
|
||||
false => "".to_string(),
|
||||
};
|
||||
|
||||
if client_given_name.is_empty() {
|
||||
if !self.prepared_statements_enabled {
|
||||
debug!("Anonymous bind message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = Bind::get_name(&message)?;
|
||||
|
||||
match self.prepared_statements.get(&client_given_name) {
|
||||
Some((rewritten_parse, _)) => {
|
||||
let message = Bind::rename(message, &rewritten_parse.name)?;
|
||||
@@ -1807,12 +1825,7 @@ where
|
||||
/// saved in the client cache.
|
||||
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||
// Avoid parsing if prepared statements not enabled
|
||||
let describe: Describe = match self.prepared_statements_enabled {
|
||||
true => (&message).try_into()?,
|
||||
false => Describe::empty_new(),
|
||||
};
|
||||
|
||||
if describe.anonymous() {
|
||||
if !self.prepared_statements_enabled {
|
||||
debug!("Anonymous describe message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||
@@ -1820,6 +1833,15 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let describe: Describe = (&message).try_into()?;
|
||||
if describe.target == 'P' {
|
||||
debug!("Portal describe message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = describe.statement_name.clone();
|
||||
|
||||
match self.prepared_statements.get(&client_given_name) {
|
||||
@@ -1904,7 +1926,7 @@ where
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
// We might be in some kind of error/in between protocol state, better to just kill this server
|
||||
server.mark_bad();
|
||||
server.mark_bad(err.to_string().as_str());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
@@ -1971,11 +1993,13 @@ where
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Statement timeout while talking to {:?} with user {}",
|
||||
address, pool.settings.user.username
|
||||
server.mark_bad(
|
||||
format!(
|
||||
"Statement timeout while talking to {:?} with user {}",
|
||||
address, pool.settings.user.username
|
||||
)
|
||||
.as_str(),
|
||||
);
|
||||
server.mark_bad();
|
||||
pool.ban(address, BanReason::StatementTimeout, Some(client_stats));
|
||||
error_response_terminal(&mut self.write, "pool statement timeout").await?;
|
||||
Err(Error::StatementTimeout)
|
||||
|
||||
@@ -38,12 +38,12 @@ pub enum Role {
|
||||
Mirror,
|
||||
}
|
||||
|
||||
impl ToString for Role {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
Role::Primary => "primary".to_string(),
|
||||
Role::Replica => "replica".to_string(),
|
||||
Role::Mirror => "mirror".to_string(),
|
||||
impl std::fmt::Display for Role {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Role::Primary => write!(f, "primary"),
|
||||
Role::Replica => write!(f, "replica"),
|
||||
Role::Mirror => write!(f, "mirror"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,6 +216,8 @@ pub struct User {
|
||||
pub server_lifetime: Option<u64>,
|
||||
#[serde(default)] // 0
|
||||
pub statement_timeout: u64,
|
||||
pub connect_timeout: Option<u64>,
|
||||
pub idle_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
impl Default for User {
|
||||
@@ -230,6 +232,8 @@ impl Default for User {
|
||||
statement_timeout: 0,
|
||||
pool_mode: None,
|
||||
server_lifetime: None,
|
||||
connect_timeout: None,
|
||||
idle_timeout: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -472,11 +476,11 @@ pub enum PoolMode {
|
||||
Session,
|
||||
}
|
||||
|
||||
impl ToString for PoolMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
PoolMode::Transaction => "transaction".to_string(),
|
||||
PoolMode::Session => "session".to_string(),
|
||||
impl std::fmt::Display for PoolMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
PoolMode::Transaction => write!(f, "transaction"),
|
||||
PoolMode::Session => write!(f, "session"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -489,12 +493,13 @@ pub enum LoadBalancingMode {
|
||||
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
||||
LeastOutstandingConnections,
|
||||
}
|
||||
impl ToString for LoadBalancingMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
LoadBalancingMode::Random => "random".to_string(),
|
||||
|
||||
impl std::fmt::Display for LoadBalancingMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
LoadBalancingMode::Random => write!(f, "random"),
|
||||
LoadBalancingMode::LeastOutstandingConnections => {
|
||||
"least_outstanding_connections".to_string()
|
||||
write!(f, "least_outstanding_connections")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -995,15 +1000,17 @@ impl Config {
|
||||
pub fn fill_up_auth_query_config(&mut self) {
|
||||
for (_name, pool) in self.pools.iter_mut() {
|
||||
if pool.auth_query.is_none() {
|
||||
pool.auth_query = self.general.auth_query.clone();
|
||||
pool.auth_query.clone_from(&self.general.auth_query);
|
||||
}
|
||||
|
||||
if pool.auth_query_user.is_none() {
|
||||
pool.auth_query_user = self.general.auth_query_user.clone();
|
||||
pool.auth_query_user
|
||||
.clone_from(&self.general.auth_query_user);
|
||||
}
|
||||
|
||||
if pool.auth_query_password.is_none() {
|
||||
pool.auth_query_password = self.general.auth_query_password.clone();
|
||||
pool.auth_query_password
|
||||
.clone_from(&self.general.auth_query_password);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1151,7 +1158,7 @@ impl Config {
|
||||
"Default max server lifetime: {}ms",
|
||||
self.general.server_lifetime
|
||||
);
|
||||
info!("Sever round robin: {}", self.general.server_round_robin);
|
||||
info!("Server round robin: {}", self.general.server_round_robin);
|
||||
match self.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
info!("TLS certificate: {}", tls_certificate);
|
||||
@@ -1307,6 +1314,24 @@ impl Config {
|
||||
None => "default".to_string(),
|
||||
}
|
||||
);
|
||||
info!(
|
||||
"[pool: {}][user: {}] Connection timeout: {}",
|
||||
pool_name,
|
||||
user.1.username,
|
||||
match user.1.connect_timeout {
|
||||
Some(connect_timeout) => format!("{}ms", connect_timeout),
|
||||
None => "not set".to_string(),
|
||||
}
|
||||
);
|
||||
info!(
|
||||
"[pool: {}][user: {}] Idle timeout: {}",
|
||||
pool_name,
|
||||
user.1.username,
|
||||
match user.1.idle_timeout {
|
||||
Some(idle_timeout) => format!("{}ms", idle_timeout),
|
||||
None => "not set".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ pub enum Error {
|
||||
QueryRouterParserError(String),
|
||||
QueryRouterError(String),
|
||||
InvalidShardId(usize),
|
||||
PreparedStatementError,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
|
||||
@@ -733,6 +733,10 @@ pub fn configure_socket(stream: &TcpStream) {
|
||||
}
|
||||
Err(err) => error!("Could not configure socket: {}", err),
|
||||
}
|
||||
match sock_ref.set_nodelay(true) {
|
||||
Ok(_) => (),
|
||||
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BytesMutReader {
|
||||
@@ -1109,7 +1113,7 @@ pub struct Describe {
|
||||
|
||||
#[allow(dead_code)]
|
||||
len: i32,
|
||||
target: char,
|
||||
pub target: char,
|
||||
pub statement_name: String,
|
||||
}
|
||||
|
||||
|
||||
@@ -85,8 +85,9 @@ impl MirroredClient {
|
||||
match recv_result {
|
||||
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
|
||||
server.mark_bad(
|
||||
format!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()).as_str()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -98,8 +99,9 @@ impl MirroredClient {
|
||||
match server.send(&BytesMut::from(&bytes[..])).await {
|
||||
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
|
||||
server.mark_bad(
|
||||
format!("Failed to receive from mirror {:?} {:?}", err, address.clone()).as_str()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
28
src/pool.rs
28
src/pool.rs
@@ -436,14 +436,20 @@ impl ConnectionPool {
|
||||
pool_config.prepared_statements_cache_size,
|
||||
);
|
||||
|
||||
let connect_timeout = match pool_config.connect_timeout {
|
||||
let connect_timeout = match user.connect_timeout {
|
||||
Some(connect_timeout) => connect_timeout,
|
||||
None => config.general.connect_timeout,
|
||||
None => match pool_config.connect_timeout {
|
||||
Some(connect_timeout) => connect_timeout,
|
||||
None => config.general.connect_timeout,
|
||||
},
|
||||
};
|
||||
|
||||
let idle_timeout = match pool_config.idle_timeout {
|
||||
let idle_timeout = match user.idle_timeout {
|
||||
Some(idle_timeout) => idle_timeout,
|
||||
None => config.general.idle_timeout,
|
||||
None => match pool_config.idle_timeout {
|
||||
Some(idle_timeout) => idle_timeout,
|
||||
None => config.general.idle_timeout,
|
||||
},
|
||||
};
|
||||
|
||||
let server_lifetime = match user.server_lifetime {
|
||||
@@ -763,7 +769,6 @@ impl ConnectionPool {
|
||||
);
|
||||
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
||||
address.stats.error();
|
||||
client_stats.idle();
|
||||
client_stats.checkout_error();
|
||||
continue;
|
||||
}
|
||||
@@ -782,7 +787,7 @@ impl ConnectionPool {
|
||||
// Health checks are pretty expensive.
|
||||
if !require_healthcheck {
|
||||
let checkout_time = now.elapsed().as_micros() as u64;
|
||||
client_stats.checkout_time(checkout_time);
|
||||
client_stats.checkout_success();
|
||||
server
|
||||
.stats()
|
||||
.checkout_time(checkout_time, client_stats.application_name());
|
||||
@@ -796,7 +801,7 @@ impl ConnectionPool {
|
||||
.await
|
||||
{
|
||||
let checkout_time = now.elapsed().as_micros() as u64;
|
||||
client_stats.checkout_time(checkout_time);
|
||||
client_stats.checkout_success();
|
||||
server
|
||||
.stats()
|
||||
.checkout_time(checkout_time, client_stats.application_name());
|
||||
@@ -808,10 +813,7 @@ impl ConnectionPool {
|
||||
}
|
||||
}
|
||||
|
||||
client_stats.idle();
|
||||
|
||||
let checkout_time = now.elapsed().as_micros() as u64;
|
||||
client_stats.checkout_time(checkout_time);
|
||||
client_stats.checkout_error();
|
||||
|
||||
Err(Error::AllServersDown)
|
||||
}
|
||||
@@ -837,7 +839,7 @@ impl ConnectionPool {
|
||||
Ok(res) => match res {
|
||||
Ok(_) => {
|
||||
let checkout_time: u64 = start.elapsed().as_micros() as u64;
|
||||
client_info.checkout_time(checkout_time);
|
||||
client_info.checkout_success();
|
||||
server
|
||||
.stats()
|
||||
.checkout_time(checkout_time, client_info.application_name());
|
||||
@@ -865,7 +867,7 @@ impl ConnectionPool {
|
||||
}
|
||||
|
||||
// Don't leave a bad connection in the pool.
|
||||
server.mark_bad();
|
||||
server.mark_bad("failed health check");
|
||||
|
||||
self.ban(address, BanReason::FailedHealthCheck, Some(client_info));
|
||||
false
|
||||
|
||||
@@ -1,23 +1,41 @@
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use http_body_util::Full;
|
||||
use hyper::body;
|
||||
use hyper::body::Bytes;
|
||||
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Method, Request, Response, StatusCode};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use log::{debug, error, info};
|
||||
use phf::phf_map;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use crate::config::Address;
|
||||
use crate::pool::{get_all_pools, PoolIdentifier};
|
||||
use crate::stats::get_server_stats;
|
||||
use crate::stats::pool::PoolStats;
|
||||
use crate::stats::{get_server_stats, ServerStats};
|
||||
|
||||
struct MetricHelpType {
|
||||
help: &'static str,
|
||||
ty: &'static str,
|
||||
}
|
||||
|
||||
struct ServerPrometheusStats {
|
||||
bytes_received: u64,
|
||||
bytes_sent: u64,
|
||||
transaction_count: u64,
|
||||
query_count: u64,
|
||||
error_count: u64,
|
||||
active_count: u64,
|
||||
idle_count: u64,
|
||||
login_count: u64,
|
||||
tested_count: u64,
|
||||
}
|
||||
|
||||
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
||||
// counters only increase
|
||||
// gauges can arbitrarily increase or decrease
|
||||
@@ -120,22 +138,46 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
|
||||
},
|
||||
"servers_bytes_received" => MetricHelpType {
|
||||
help: "Volume in bytes of network traffic received by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_bytes_sent" => MetricHelpType {
|
||||
help: "Volume in bytes of network traffic sent by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_transaction_count" => MetricHelpType {
|
||||
help: "Number of transactions executed by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_query_count" => MetricHelpType {
|
||||
help: "Number of queries executed by server",
|
||||
ty: "gauge",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_error_count" => MetricHelpType {
|
||||
help: "Number of errors",
|
||||
ty: "counter",
|
||||
},
|
||||
"servers_idle_count" => MetricHelpType {
|
||||
help: "Number of server connection in idle state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_active_count" => MetricHelpType {
|
||||
help: "Number of server connection in active state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_tested_count" => MetricHelpType {
|
||||
help: "Number of server connection in tested state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_login_count" => MetricHelpType {
|
||||
help: "Number of server connection in login state",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_is_banned" => MetricHelpType {
|
||||
help: "0 if server is not banned, 1 if server is banned",
|
||||
ty: "gauge",
|
||||
},
|
||||
"servers_is_paused" => MetricHelpType {
|
||||
help: "0 if server is not paused, 1 if server is paused",
|
||||
ty: "gauge",
|
||||
},
|
||||
"databases_pool_size" => MetricHelpType {
|
||||
@@ -203,7 +245,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("user", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||
}
|
||||
@@ -218,8 +262,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
|
||||
labels.insert("user", address.username.clone());
|
||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||
}
|
||||
|
||||
@@ -229,7 +274,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("shard", address.shard.to_string());
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("user", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||
}
|
||||
@@ -243,7 +290,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
|
||||
async fn prometheus_stats(
|
||||
request: Request<body::Incoming>,
|
||||
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
|
||||
match (request.method(), request.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let mut lines = Vec::new();
|
||||
@@ -329,34 +378,51 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||
fn push_server_stats(lines: &mut Vec<String>) {
|
||||
let server_stats = get_server_stats();
|
||||
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
|
||||
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
|
||||
for (_, stats) in server_stats {
|
||||
server_stats_by_addresses.insert(stats.address_name(), stats);
|
||||
let entry = prom_stats
|
||||
.entry(stats.address_name())
|
||||
.or_insert(ServerPrometheusStats {
|
||||
bytes_received: 0,
|
||||
bytes_sent: 0,
|
||||
transaction_count: 0,
|
||||
query_count: 0,
|
||||
error_count: 0,
|
||||
active_count: 0,
|
||||
idle_count: 0,
|
||||
login_count: 0,
|
||||
tested_count: 0,
|
||||
});
|
||||
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
|
||||
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
|
||||
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
|
||||
entry.query_count += stats.query_count.load(Ordering::Relaxed);
|
||||
entry.error_count += stats.error_count.load(Ordering::Relaxed);
|
||||
match stats.state.load(Ordering::Relaxed) {
|
||||
crate::stats::ServerState::Login => entry.login_count += 1,
|
||||
crate::stats::ServerState::Active => entry.active_count += 1,
|
||||
crate::stats::ServerState::Tested => entry.tested_count += 1,
|
||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||
}
|
||||
}
|
||||
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
|
||||
if let Some(server_info) = prom_stats.get(&address.name()) {
|
||||
let metrics = [
|
||||
(
|
||||
"bytes_received",
|
||||
server_info.bytes_received.load(Ordering::Relaxed),
|
||||
),
|
||||
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
|
||||
(
|
||||
"transaction_count",
|
||||
server_info.transaction_count.load(Ordering::Relaxed),
|
||||
),
|
||||
(
|
||||
"query_count",
|
||||
server_info.query_count.load(Ordering::Relaxed),
|
||||
),
|
||||
(
|
||||
"error_count",
|
||||
server_info.error_count.load(Ordering::Relaxed),
|
||||
),
|
||||
("bytes_received", server_info.bytes_received),
|
||||
("bytes_sent", server_info.bytes_sent),
|
||||
("transaction_count", server_info.transaction_count),
|
||||
("query_count", server_info.query_count),
|
||||
("error_count", server_info.error_count),
|
||||
("idle_count", server_info.idle_count),
|
||||
("active_count", server_info.active_count),
|
||||
("login_count", server_info.login_count),
|
||||
("tested_count", server_info.tested_count),
|
||||
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
|
||||
("is_paused", if pool.paused() { 1 } else { 0 }),
|
||||
];
|
||||
for (key, value) in metrics {
|
||||
if let Some(prometheus_metric) =
|
||||
@@ -374,14 +440,35 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
|
||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||
let http_service_factory =
|
||||
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
|
||||
let server = Server::bind(&http_addr).serve(http_service_factory);
|
||||
let listener = TcpListener::bind(http_addr);
|
||||
let listener = match listener.await {
|
||||
Ok(listener) => listener,
|
||||
Err(e) => {
|
||||
error!("Failed to bind prometheus server to HTTP address: {}.", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Exposing prometheus metrics on http://{}/metrics.",
|
||||
http_addr
|
||||
);
|
||||
if let Err(e) = server.await {
|
||||
error!("Failed to run HTTP server: {}.", e);
|
||||
loop {
|
||||
let stream = match listener.accept().await {
|
||||
Ok((stream, _)) => stream,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(prometheus_stats))
|
||||
.await
|
||||
{
|
||||
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,8 +427,12 @@ impl QueryRouter {
|
||||
None => (),
|
||||
};
|
||||
|
||||
// If we already visited a write statement, we should be going to the primary.
|
||||
if !visited_write_statement {
|
||||
let has_locks = !query.locks.is_empty();
|
||||
|
||||
if has_locks {
|
||||
self.active_role = Some(Role::Primary);
|
||||
} else if !visited_write_statement {
|
||||
// If we already visited a write statement, we should be going to the primary.
|
||||
self.active_role = match self.primary_reads_enabled() {
|
||||
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
|
||||
true => None, // Any server role is fine in this case.
|
||||
@@ -499,6 +503,7 @@ impl QueryRouter {
|
||||
table: _,
|
||||
on: _,
|
||||
returning: _,
|
||||
ignore: _,
|
||||
} => {
|
||||
// Not supported in postgres.
|
||||
assert!(or.is_none());
|
||||
@@ -506,7 +511,9 @@ impl QueryRouter {
|
||||
assert!(after_columns.is_empty());
|
||||
|
||||
Self::process_table(table_name, &mut table_names);
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
if let Some(source) = source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
}
|
||||
}
|
||||
Delete {
|
||||
tables,
|
||||
@@ -514,6 +521,8 @@ impl QueryRouter {
|
||||
using,
|
||||
selection,
|
||||
returning: _,
|
||||
order_by: _,
|
||||
limit: _,
|
||||
} => {
|
||||
if let Some(expr) = selection {
|
||||
exprs.push(expr.clone());
|
||||
@@ -1153,6 +1162,29 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select_for_update() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
|
||||
let queries_in_primary_role = vec![
|
||||
simple_query("BEGIN"), // Transaction start
|
||||
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
|
||||
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
|
||||
];
|
||||
|
||||
for query in queries_in_primary_role {
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
// query without lock do not change role
|
||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_primary_reads_enabled() {
|
||||
QueryRouter::setup();
|
||||
@@ -1367,6 +1399,19 @@ mod test {
|
||||
assert!(!qr.query_parser_enabled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_parser() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
|
||||
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
|
||||
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
|
||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_from_pool_settings() {
|
||||
QueryRouter::setup();
|
||||
|
||||
@@ -7,7 +7,7 @@ use lru::LruCache;
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use postgres_protocol::message;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::mem;
|
||||
use std::net::IpAddr;
|
||||
use std::num::NonZeroUsize;
|
||||
@@ -325,6 +325,9 @@ pub struct Server {
|
||||
|
||||
/// Prepared statements
|
||||
prepared_statement_cache: Option<LruCache<String, ()>>,
|
||||
|
||||
/// Prepared statement being currently registered on the server.
|
||||
registering_prepared_statement: VecDeque<String>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
@@ -827,6 +830,7 @@ impl Server {
|
||||
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
|
||||
)),
|
||||
},
|
||||
registering_prepared_statement: VecDeque::new(),
|
||||
};
|
||||
|
||||
return Ok(server);
|
||||
@@ -956,7 +960,6 @@ impl Server {
|
||||
|
||||
// There is no more data available from the server.
|
||||
self.data_available = false;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -966,6 +969,23 @@ impl Server {
|
||||
self.in_copy_mode = false;
|
||||
}
|
||||
|
||||
// Remove the prepared statement from the cache, it has a syntax error or something else bad happened.
|
||||
if let Some(prepared_stmt_name) =
|
||||
self.registering_prepared_statement.pop_front()
|
||||
{
|
||||
if let Some(ref mut cache) = self.prepared_statement_cache {
|
||||
if let Some(_removed) = cache.pop(&prepared_stmt_name) {
|
||||
debug!(
|
||||
"Removed {} from prepared statement cache",
|
||||
prepared_stmt_name
|
||||
);
|
||||
} else {
|
||||
// Shouldn't happen.
|
||||
debug!("Prepared statement {} was not cached", prepared_stmt_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.prepared_statement_cache.is_some() {
|
||||
let error_message = PgErrorMsg::parse(&message)?;
|
||||
if error_message.message == "cached plan must not change result type" {
|
||||
@@ -1068,6 +1088,11 @@ impl Server {
|
||||
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
|
||||
'c' => (),
|
||||
|
||||
// Parse complete successfully
|
||||
'1' => {
|
||||
self.registering_prepared_statement.pop_front();
|
||||
}
|
||||
|
||||
// Anything else, e.g. errors, notices, etc.
|
||||
// Keep buffering until ReadyForQuery shows up.
|
||||
_ => (),
|
||||
@@ -1107,7 +1132,7 @@ impl Server {
|
||||
has_it
|
||||
}
|
||||
|
||||
pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||
let cache = match &mut self.prepared_statement_cache {
|
||||
Some(cache) => cache,
|
||||
None => return None,
|
||||
@@ -1129,7 +1154,7 @@ impl Server {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn remove_prepared_statement_from_cache(&mut self, name: &str) {
|
||||
fn remove_prepared_statement_from_cache(&mut self, name: &str) {
|
||||
let cache = match &mut self.prepared_statement_cache {
|
||||
Some(cache) => cache,
|
||||
None => return,
|
||||
@@ -1145,6 +1170,9 @@ impl Server {
|
||||
should_send_parse_to_server: bool,
|
||||
) -> Result<(), Error> {
|
||||
if !self.has_prepared_statement(&parse.name) {
|
||||
self.registering_prepared_statement
|
||||
.push_back(parse.name.clone());
|
||||
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
if should_send_parse_to_server {
|
||||
@@ -1176,7 +1204,13 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
// If it's not there, something went bad, I'm guessing bad syntax or permissions error
|
||||
// on the server.
|
||||
if !self.has_prepared_statement(&parse.name) {
|
||||
Err(Error::PreparedStatementError)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// If the server is still inside a transaction.
|
||||
@@ -1186,6 +1220,7 @@ impl Server {
|
||||
self.in_transaction
|
||||
}
|
||||
|
||||
/// Currently copying data from client to server or vice-versa.
|
||||
pub fn in_copy_mode(&self) -> bool {
|
||||
self.in_copy_mode
|
||||
}
|
||||
@@ -1244,8 +1279,8 @@ impl Server {
|
||||
}
|
||||
|
||||
/// Indicate that this server connection cannot be re-used and must be discarded.
|
||||
pub fn mark_bad(&mut self) {
|
||||
error!("Server {:?} marked bad", self.address);
|
||||
pub fn mark_bad(&mut self, reason: &str) {
|
||||
error!("Server {:?} marked bad, reason: {}", self.address, reason);
|
||||
self.bad = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,11 +14,11 @@ pub enum ShardingFunction {
|
||||
Sha1,
|
||||
}
|
||||
|
||||
impl ToString for ShardingFunction {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
|
||||
ShardingFunction::Sha1 => "sha1".to_string(),
|
||||
impl std::fmt::Display for ShardingFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
|
||||
ShardingFunction::Sha1 => write!(f, "sha1"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,11 @@ pub struct ClientStats {
|
||||
/// Maximum time spent waiting for a connection from pool, measures in microseconds
|
||||
pub max_wait_time: Arc<AtomicU64>,
|
||||
|
||||
// Time when the client started waiting for a connection from pool, measures in microseconds
|
||||
// We use connect_time as the reference point for this value
|
||||
// U64 can represent ~5850 centuries in microseconds, so we should be fine
|
||||
pub wait_start_us: Arc<AtomicU64>,
|
||||
|
||||
/// Current state of the client
|
||||
pub state: Arc<AtomicClientState>,
|
||||
|
||||
@@ -64,6 +69,7 @@ impl Default for ClientStats {
|
||||
pool_name: String::new(),
|
||||
total_wait_time: Arc::new(AtomicU64::new(0)),
|
||||
max_wait_time: Arc::new(AtomicU64::new(0)),
|
||||
wait_start_us: Arc::new(AtomicU64::new(0)),
|
||||
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
|
||||
transaction_count: Arc::new(AtomicU64::new(0)),
|
||||
query_count: Arc::new(AtomicU64::new(0)),
|
||||
@@ -111,6 +117,9 @@ impl ClientStats {
|
||||
|
||||
/// Reports a client is waiting for a connection
|
||||
pub fn waiting(&self) {
|
||||
let wait_start = self.connect_time.elapsed().as_micros() as u64;
|
||||
|
||||
self.wait_start_us.store(wait_start, Ordering::Relaxed);
|
||||
self.state.store(ClientState::Waiting, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
@@ -122,6 +131,13 @@ impl ClientStats {
|
||||
/// Reports a client has failed to obtain a connection from a connection pool
|
||||
pub fn checkout_error(&self) {
|
||||
self.state.store(ClientState::Idle, Ordering::Relaxed);
|
||||
self.update_wait_times();
|
||||
}
|
||||
|
||||
/// Reports a client has succeeded in obtaining a connection from a connection pool
|
||||
pub fn checkout_success(&self) {
|
||||
self.state.store(ClientState::Active, Ordering::Relaxed);
|
||||
self.update_wait_times();
|
||||
}
|
||||
|
||||
/// Reports a client has had the server assigned to it be banned
|
||||
@@ -130,12 +146,26 @@ impl ClientStats {
|
||||
self.error_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Reporters the time spent by a client waiting to get a healthy connection from the pool
|
||||
pub fn checkout_time(&self, microseconds: u64) {
|
||||
fn update_wait_times(&self) {
|
||||
if self.wait_start_us.load(Ordering::Relaxed) == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let wait_time_us = self.get_current_wait_time_us();
|
||||
self.total_wait_time
|
||||
.fetch_add(microseconds, Ordering::Relaxed);
|
||||
.fetch_add(wait_time_us, Ordering::Relaxed);
|
||||
self.max_wait_time
|
||||
.fetch_max(microseconds, Ordering::Relaxed);
|
||||
.fetch_max(wait_time_us, Ordering::Relaxed);
|
||||
self.wait_start_us.store(0, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_current_wait_time_us(&self) -> u64 {
|
||||
let wait_start_us = self.wait_start_us.load(Ordering::Relaxed);
|
||||
let microseconds_since_connection_epoch = self.connect_time.elapsed().as_micros() as u64;
|
||||
if wait_start_us == 0 || microseconds_since_connection_epoch < wait_start_us {
|
||||
return 0;
|
||||
}
|
||||
microseconds_since_connection_epoch - wait_start_us
|
||||
}
|
||||
|
||||
/// Report a query executed by a client against a server
|
||||
|
||||
@@ -64,8 +64,11 @@ impl PoolStats {
|
||||
ClientState::Idle => pool_stats.cl_idle += 1,
|
||||
ClientState::Waiting => pool_stats.cl_waiting += 1,
|
||||
}
|
||||
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
||||
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
|
||||
let wait_start_us = client.wait_start_us.load(Ordering::Relaxed);
|
||||
if wait_start_us > 0 {
|
||||
let wait_time_us = client.get_current_wait_time_us();
|
||||
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time_us);
|
||||
}
|
||||
}
|
||||
None => debug!("Client from an obselete pool"),
|
||||
}
|
||||
|
||||
34
start_test_env.sh
Executable file
34
start_test_env.sh
Executable file
@@ -0,0 +1,34 @@
|
||||
GREEN="\033[0;32m"
|
||||
RED="\033[0;31m"
|
||||
BLUE="\033[0;34m"
|
||||
RESET="\033[0m"
|
||||
|
||||
|
||||
cd tests/docker/
|
||||
docker compose kill main || true
|
||||
docker compose build main
|
||||
docker compose down
|
||||
docker compose up -d
|
||||
# wait for the container to start
|
||||
while ! docker compose exec main ls; do
|
||||
echo "Waiting for test environment to start"
|
||||
sleep 1
|
||||
done
|
||||
echo "==================================="
|
||||
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
||||
docker compose exec --workdir /app main cargo build
|
||||
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
||||
docker compose exec --workdir /app/tests/ruby main bundle install
|
||||
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
||||
echo "Interactive test environment ready"
|
||||
echo "To run integration tests, you can use the following commands:"
|
||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo -e " ${GREEN}cargo build${RESET}"
|
||||
echo "and then run the tests again"
|
||||
echo "==================================="
|
||||
docker compose exec --workdir /app/tests main bash
|
||||
@@ -8,3 +8,6 @@ RUN rustup component add llvm-tools-preview
|
||||
RUN sudo gem install bundler
|
||||
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
version: "3"
|
||||
services:
|
||||
pg1:
|
||||
image: postgres:14
|
||||
@@ -48,6 +47,8 @@ services:
|
||||
main:
|
||||
build: .
|
||||
command: ["bash", "/app/tests/docker/run.sh"]
|
||||
environment:
|
||||
- INTERACTIVE_TEST_ENVIRONMENT=true
|
||||
volumes:
|
||||
- ../../:/app/
|
||||
- /app/target/
|
||||
|
||||
@@ -5,6 +5,38 @@ rm /app/*.profraw || true
|
||||
rm /app/pgcat.profdata || true
|
||||
rm -rf /app/cov || true
|
||||
|
||||
# Prepares the interactive test environment
|
||||
#
|
||||
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
||||
ports=(5432 7432 8432 9432 10432)
|
||||
for port in "${ports[@]}"; do
|
||||
is_it_up=0
|
||||
attempts=0
|
||||
while [ $is_it_up -eq 0 ]; do
|
||||
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "PostgreSQL on port $port is up."
|
||||
is_it_up=1
|
||||
else
|
||||
attempts=$((attempts+1))
|
||||
if [ $attempts -gt 10 ]; then
|
||||
echo "PostgreSQL on port $port is down, giving up."
|
||||
exit 1
|
||||
fi
|
||||
echo "PostgreSQL on port $port is down, waiting for it to start."
|
||||
sleep 1
|
||||
fi
|
||||
done
|
||||
done
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||
sleep 100000000000000000
|
||||
exit 0
|
||||
fi
|
||||
|
||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||
export RUSTC_BOOTSTRAP=1
|
||||
export CARGO_INCREMENTAL=0
|
||||
|
||||
5
tests/go/go.mod
Normal file
5
tests/go/go.mod
Normal file
@@ -0,0 +1,5 @@
|
||||
module pgcat
|
||||
|
||||
go 1.21
|
||||
|
||||
require github.com/lib/pq v1.10.9
|
||||
2
tests/go/go.sum
Normal file
2
tests/go/go.sum
Normal file
@@ -0,0 +1,2 @@
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
162
tests/go/pgcat.toml
Normal file
162
tests/go/pgcat.toml
Normal file
@@ -0,0 +1,162 @@
|
||||
#
|
||||
# PgCat config example.
|
||||
#
|
||||
|
||||
#
|
||||
# General pooler settings
|
||||
[general]
|
||||
# What IP to run on, 0.0.0.0 means accessible from everywhere.
|
||||
host = "0.0.0.0"
|
||||
|
||||
# Port to run on, same as PgBouncer used in this example.
|
||||
port = "${PORT}"
|
||||
|
||||
# Whether to enable prometheus exporter or not.
|
||||
enable_prometheus_exporter = true
|
||||
|
||||
# Port at which prometheus exporter listens on.
|
||||
prometheus_exporter_port = 9930
|
||||
|
||||
# How long to wait before aborting a server connection (ms).
|
||||
connect_timeout = 1000
|
||||
|
||||
# How much time to give the health check query to return with a result (ms).
|
||||
healthcheck_timeout = 1000
|
||||
|
||||
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
|
||||
healthcheck_delay = 30000
|
||||
|
||||
# How much time to give clients during shutdown before forcibly killing client connections (ms).
|
||||
shutdown_timeout = 5000
|
||||
|
||||
# For how long to ban a server if it fails a health check (seconds).
|
||||
ban_time = 60 # Seconds
|
||||
|
||||
# If we should log client connections
|
||||
log_client_connections = false
|
||||
|
||||
# If we should log client disconnections
|
||||
log_client_disconnections = false
|
||||
|
||||
# Reload config automatically if it changes.
|
||||
autoreload = 15000
|
||||
|
||||
server_round_robin = false
|
||||
|
||||
# TLS
|
||||
tls_certificate = "../../.circleci/server.cert"
|
||||
tls_private_key = "../../.circleci/server.key"
|
||||
|
||||
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
|
||||
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
|
||||
admin_username = "admin_user"
|
||||
admin_password = "admin_pass"
|
||||
|
||||
# pool
|
||||
# configs are structured as pool.<pool_name>
|
||||
# the pool_name is what clients use as database name when connecting
|
||||
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
|
||||
[pools.sharded_db]
|
||||
# Pool mode (see PgBouncer docs for more).
|
||||
# session: one server connection per connected client
|
||||
# transaction: one server connection per client transaction
|
||||
pool_mode = "transaction"
|
||||
|
||||
# If the client doesn't specify, route traffic to
|
||||
# this role by default.
|
||||
#
|
||||
# any: round-robin between primary and replicas,
|
||||
# replica: round-robin between replicas only without touching the primary,
|
||||
# primary: all queries go to the primary unless otherwise specified.
|
||||
default_role = "any"
|
||||
|
||||
# Query parser. If enabled, we'll attempt to parse
|
||||
# every incoming query to determine if it's a read or a write.
|
||||
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
|
||||
# we'll direct it to the primary.
|
||||
query_parser_enabled = true
|
||||
|
||||
# If the query parser is enabled and this setting is enabled, we'll attempt to
|
||||
# infer the role from the query itself.
|
||||
query_parser_read_write_splitting = true
|
||||
|
||||
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# queries. The primary can always be explicitely selected with our custom protocol.
|
||||
primary_reads_enabled = true
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
# or you've already built one and you want this pooler to use it?
|
||||
#
|
||||
# Current options:
|
||||
#
|
||||
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
|
||||
# sha1: A hashing function based on SHA1
|
||||
#
|
||||
sharding_function = "pg_bigint_hash"
|
||||
|
||||
# Prepared statements cache size.
|
||||
prepared_statements_cache_size = 500
|
||||
|
||||
# Credentials for users that may connect to this cluster
|
||||
[pools.sharded_db.users.0]
|
||||
username = "sharding_user"
|
||||
password = "sharding_user"
|
||||
# Maximum number of server connections that can be established for this user
|
||||
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
# is the sum of pool_size across all users.
|
||||
pool_size = 5
|
||||
statement_timeout = 0
|
||||
|
||||
|
||||
[pools.sharded_db.users.1]
|
||||
username = "other_user"
|
||||
password = "other_user"
|
||||
pool_size = 21
|
||||
statement_timeout = 30000
|
||||
|
||||
# Shard 0
|
||||
[pools.sharded_db.shards.0]
|
||||
# [ host, port, role ]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ]
|
||||
]
|
||||
# Database name (e.g. "postgres")
|
||||
database = "shard0"
|
||||
|
||||
[pools.sharded_db.shards.1]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ],
|
||||
]
|
||||
database = "shard1"
|
||||
|
||||
[pools.sharded_db.shards.2]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ],
|
||||
]
|
||||
database = "shard2"
|
||||
|
||||
|
||||
[pools.simple_db]
|
||||
pool_mode = "session"
|
||||
default_role = "primary"
|
||||
query_parser_enabled = true
|
||||
query_parser_read_write_splitting = true
|
||||
primary_reads_enabled = true
|
||||
sharding_function = "pg_bigint_hash"
|
||||
|
||||
[pools.simple_db.users.0]
|
||||
username = "simple_user"
|
||||
password = "simple_user"
|
||||
pool_size = 5
|
||||
statement_timeout = 30000
|
||||
|
||||
[pools.simple_db.shards.0]
|
||||
servers = [
|
||||
[ "127.0.0.1", 5432, "primary" ],
|
||||
[ "localhost", 5432, "replica" ]
|
||||
]
|
||||
database = "some_db"
|
||||
52
tests/go/prepared_test.go
Normal file
52
tests/go/prepared_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package pgcat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
_ "github.com/lib/pq"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test(t *testing.T) {
|
||||
t.Cleanup(setup(t))
|
||||
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
|
||||
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
|
||||
}
|
||||
|
||||
func namedParameterizedPreparedStatement(t *testing.T) {
|
||||
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
|
||||
if err != nil {
|
||||
t.Fatalf("could not open connection: %+v", err)
|
||||
}
|
||||
|
||||
stmt, err := db.Prepare("SELECT $1")
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("could not prepare: %+v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
rows, err := stmt.Query(1)
|
||||
if err != nil {
|
||||
t.Fatalf("could not query: %+v", err)
|
||||
}
|
||||
_ = rows.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func unnamedParameterizedPreparedStatement(t *testing.T) {
|
||||
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
|
||||
if err != nil {
|
||||
t.Fatalf("could not open connection: %+v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
// Under the hood QueryContext generates an unnamed parameterized prepared statement
|
||||
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
|
||||
if err != nil {
|
||||
t.Fatalf("could not query: %+v", err)
|
||||
}
|
||||
_ = rows.Close()
|
||||
}
|
||||
}
|
||||
81
tests/go/setup.go
Normal file
81
tests/go/setup.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package pgcat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
//go:embed pgcat.toml
|
||||
var pgcatCfg string
|
||||
|
||||
var port = rand.Intn(32760-20000) + 20000
|
||||
|
||||
func setup(t *testing.T) func() {
|
||||
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
|
||||
if err != nil {
|
||||
t.Fatalf("could not create temp file: %+v", err)
|
||||
}
|
||||
|
||||
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
|
||||
|
||||
_, err = cfg.Write([]byte(pgcatCfg))
|
||||
if err != nil {
|
||||
t.Fatalf("could not write temp file: %+v", err)
|
||||
}
|
||||
|
||||
commandPath := "../../target/debug/pgcat"
|
||||
if os.Getenv("CARGO_TARGET_DIR") != "" {
|
||||
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
|
||||
}
|
||||
|
||||
cmd := exec.Command(commandPath, cfg.Name())
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
go func() {
|
||||
err = cmd.Run()
|
||||
if err != nil {
|
||||
t.Errorf("could not run pgcat: %+v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
|
||||
defer cancelFunc()
|
||||
for {
|
||||
select {
|
||||
case <-deadline.Done():
|
||||
break
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rows, err := db.QueryContext(deadline, "SHOW STATS")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_ = rows.Close()
|
||||
_ = db.Close()
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return func() {
|
||||
err := cmd.Process.Signal(os.Interrupt)
|
||||
if err != nil {
|
||||
t.Fatalf("could not interrupt pgcat: %+v", err)
|
||||
}
|
||||
err = os.Remove(cfg.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("could not remove temp file: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -91,6 +91,27 @@ describe "Admin" do
|
||||
end
|
||||
end
|
||||
|
||||
[
|
||||
"SHOW ME THE MONEY",
|
||||
"SHOW ME THE WAY",
|
||||
"SHOW UP",
|
||||
"SHOWTIME",
|
||||
"HAMMER TIME",
|
||||
"SHOWN TO BE TRUE",
|
||||
"SHOW ",
|
||||
"SHOW ",
|
||||
"SHOW 1",
|
||||
";;;;;"
|
||||
].each do |cmd|
|
||||
describe "Bad command #{cmd}" do
|
||||
it "does not panic and responds with PG::SystemError" do
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
|
||||
admin_conn.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "PAUSE" do
|
||||
it "pauses all pools" do
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
|
||||
@@ -233,17 +233,19 @@ describe "Stats" do
|
||||
sleep(1.1) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||
|
||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["maxwait"]).to eq("1")
|
||||
expect(results["cl_waiting"]).to eq("2")
|
||||
expect(results["cl_active"]).to eq("2")
|
||||
expect(results["sv_active"]).to eq("2")
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("4")
|
||||
@@ -255,22 +257,23 @@ describe "Stats" do
|
||||
|
||||
it "show correct max_wait" do
|
||||
threads = []
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
||||
end
|
||||
sleep(1.1)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
# Value is only reported when there are clients waiting
|
||||
expect(results["maxwait"]).to eq("1")
|
||||
expect(results["maxwait_us"].to_i).to be_within(20_000).of(100_000)
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
|
||||
expect(results["maxwait"]).to eq("1")
|
||||
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
|
||||
connections.map(&:close)
|
||||
|
||||
sleep(4.5) # Allow time for stats to update
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
# no clients are waiting so value is 0
|
||||
expect(results["maxwait"]).to eq("0")
|
||||
expect(results["maxwait_us"]).to eq("0")
|
||||
connections.map(&:close)
|
||||
|
||||
threads.map(&:join)
|
||||
end
|
||||
@@ -329,6 +332,40 @@ describe "Stats" do
|
||||
admin_conn.close
|
||||
connections.map(&:close)
|
||||
end
|
||||
|
||||
context "when client has waited for a server" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
|
||||
|
||||
it "shows correct maxwait" do
|
||||
threads = []
|
||||
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
||||
end
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
|
||||
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
|
||||
|
||||
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
|
||||
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
|
||||
|
||||
expect(non_waiting_clients.count).to eq(2)
|
||||
non_waiting_clients.each do |client|
|
||||
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
|
||||
end
|
||||
|
||||
expect(waiting_clients.count).to eq(1)
|
||||
waiting_clients.each do |client|
|
||||
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
|
||||
end
|
||||
|
||||
admin_conn.close
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
682
tests/rust/Cargo.lock
generated
682
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,8 +15,13 @@ async fn test_prepared_statements() {
|
||||
for _ in 0..5 {
|
||||
let pool = pool.clone();
|
||||
let handle = tokio::task::spawn(async move {
|
||||
for _ in 0..1000 {
|
||||
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
|
||||
for i in 0..1000 {
|
||||
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
panic!("prepared statement error: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ mkdir -p "$deb_dir/etc/systemd/system"
|
||||
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
|
||||
chmod +x "$deb_dir/usr/bin/pgcat"
|
||||
|
||||
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
|
||||
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
|
||||
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
|
||||
|
||||
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
|
||||
|
||||
Reference in New Issue
Block a user