mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
Compare commits
1 Commits
mostafa_fi
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26dd78d0aa |
@@ -59,7 +59,6 @@ admin_password = "admin_pass"
|
|||||||
# session: one server connection per connected client
|
# session: one server connection per connected client
|
||||||
# transaction: one server connection per client transaction
|
# transaction: one server connection per client transaction
|
||||||
pool_mode = "transaction"
|
pool_mode = "transaction"
|
||||||
prepared_statements_cache_size = 500
|
|
||||||
|
|
||||||
# If the client doesn't specify, route traffic to
|
# If the client doesn't specify, route traffic to
|
||||||
# this role by default.
|
# this role by default.
|
||||||
@@ -142,7 +141,6 @@ query_parser_enabled = true
|
|||||||
query_parser_read_write_splitting = true
|
query_parser_read_write_splitting = true
|
||||||
primary_reads_enabled = true
|
primary_reads_enabled = true
|
||||||
sharding_function = "pg_bigint_hash"
|
sharding_function = "pg_bigint_hash"
|
||||||
prepared_statements_cache_size = 500
|
|
||||||
|
|
||||||
[pools.simple_db.users.0]
|
[pools.simple_db.users.0]
|
||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
|
|||||||
16
.github/workflows/build-and-push.yaml
vendored
16
.github/workflows/build-and-push.yaml
vendored
@@ -23,17 +23,14 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout Repository
|
- name: Checkout Repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
- name: Set up QEMU
|
|
||||||
uses: docker/setup-qemu-action@v3
|
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v2
|
||||||
|
|
||||||
- name: Determine tags
|
- name: Determine tags
|
||||||
id: metadata
|
id: metadata
|
||||||
uses: docker/metadata-action@v5
|
uses: docker/metadata-action@v4
|
||||||
with:
|
with:
|
||||||
images: ${{ env.registry }}/${{ env.image-name }}
|
images: ${{ env.registry }}/${{ env.image-name }}
|
||||||
tags: |
|
tags: |
|
||||||
@@ -45,18 +42,15 @@ jobs:
|
|||||||
type=raw,value=latest,enable={{ is_default_branch }}
|
type=raw,value=latest,enable={{ is_default_branch }}
|
||||||
|
|
||||||
- name: Log in to the Container registry
|
- name: Log in to the Container registry
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v2.1.0
|
||||||
with:
|
with:
|
||||||
registry: ${{ env.registry }}
|
registry: ${{ env.registry }}
|
||||||
username: ${{ github.actor }}
|
username: ${{ github.actor }}
|
||||||
password: ${{ secrets.GITHUB_TOKEN }}
|
password: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Build and push ${{ env.image-name }}
|
- name: Build and push ${{ env.image-name }}
|
||||||
uses: docker/build-push-action@v6
|
uses: docker/build-push-action@v3
|
||||||
with:
|
with:
|
||||||
context: .
|
|
||||||
platforms: linux/amd64,linux/arm64
|
|
||||||
provenance: false
|
|
||||||
push: true
|
push: true
|
||||||
tags: ${{ steps.metadata.outputs.tags }}
|
tags: ${{ steps.metadata.outputs.tags }}
|
||||||
labels: ${{ steps.metadata.outputs.labels }}
|
labels: ${{ steps.metadata.outputs.labels }}
|
||||||
|
|||||||
@@ -6,32 +6,6 @@ Thank you for contributing! Just a few tips here:
|
|||||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||||
|
|
||||||
## How to run the integration tests locally and iterate on them
|
|
||||||
We have integration tests written in Ruby, Python, Go and Rust.
|
|
||||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
|
||||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
|
||||||
|
|
||||||
|
|
||||||
Quite simply, make sure you have docker installed and then run
|
|
||||||
`./start_test_env.sh`
|
|
||||||
|
|
||||||
That is it!
|
|
||||||
|
|
||||||
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
|
||||||
|
|
||||||
Once the environment is ready, you can run the tests by running
|
|
||||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
|
||||||
Python: `cd /app && python3 tests/python/tests.py`
|
|
||||||
Rust: `cd /app/tests/rust && cargo run`
|
|
||||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
|
||||||
|
|
||||||
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
|
||||||
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Happy hacking!
|
Happy hacking!
|
||||||
|
|
||||||
## TODOs
|
## TODOs
|
||||||
|
|||||||
98
Cargo.lock
generated
98
Cargo.lock
generated
@@ -146,12 +146,6 @@ dependencies = [
|
|||||||
"syn 2.0.26",
|
"syn 2.0.26",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "atomic-waker"
|
|
||||||
version = "1.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atomic_enum"
|
name = "atomic_enum"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
@@ -548,23 +542,29 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "h2"
|
name = "h2"
|
||||||
version = "0.4.6"
|
version = "0.3.20"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
|
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atomic-waker",
|
|
||||||
"bytes",
|
"bytes",
|
||||||
"fnv",
|
"fnv",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
|
"futures-util",
|
||||||
"http",
|
"http",
|
||||||
"indexmap",
|
"indexmap 1.9.3",
|
||||||
"slab",
|
"slab",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashbrown"
|
||||||
|
version = "0.12.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashbrown"
|
name = "hashbrown"
|
||||||
version = "0.14.0"
|
version = "0.14.0"
|
||||||
@@ -609,9 +609,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "1.1.0"
|
version = "0.2.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
|
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"fnv",
|
"fnv",
|
||||||
@@ -620,24 +620,12 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http-body"
|
name = "http-body"
|
||||||
version = "1.0.1"
|
version = "0.4.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
|
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"http",
|
"http",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "http-body-util"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"futures-util",
|
|
||||||
"http",
|
|
||||||
"http-body",
|
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -655,12 +643,13 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper"
|
name = "hyper"
|
||||||
version = "1.4.1"
|
version = "0.14.27"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
|
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"h2",
|
"h2",
|
||||||
"http",
|
"http",
|
||||||
@@ -669,26 +658,13 @@ dependencies = [
|
|||||||
"httpdate",
|
"httpdate",
|
||||||
"itoa",
|
"itoa",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"smallvec",
|
"socket2 0.4.9",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
"want",
|
"want",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hyper-util"
|
|
||||||
version = "0.1.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"futures-util",
|
|
||||||
"http",
|
|
||||||
"http-body",
|
|
||||||
"hyper",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "iana-time-zone"
|
name = "iana-time-zone"
|
||||||
version = "0.1.57"
|
version = "0.1.57"
|
||||||
@@ -733,6 +709,16 @@ dependencies = [
|
|||||||
"unicode-normalization",
|
"unicode-normalization",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "indexmap"
|
||||||
|
version = "1.9.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"hashbrown 0.12.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
@@ -740,7 +726,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
|
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"equivalent",
|
"equivalent",
|
||||||
"hashbrown",
|
"hashbrown 0.14.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -774,9 +760,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itertools"
|
name = "itertools"
|
||||||
version = "0.10.5"
|
version = "0.13.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
|
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"either",
|
"either",
|
||||||
]
|
]
|
||||||
@@ -862,7 +848,7 @@ version = "0.12.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
|
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"hashbrown",
|
"hashbrown 0.14.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1034,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.2.0"
|
version = "1.1.2-dev4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -1048,9 +1034,7 @@ dependencies = [
|
|||||||
"fallible-iterator",
|
"fallible-iterator",
|
||||||
"futures",
|
"futures",
|
||||||
"hmac",
|
"hmac",
|
||||||
"http-body-util",
|
|
||||||
"hyper",
|
"hyper",
|
||||||
"hyper-util",
|
|
||||||
"itertools",
|
"itertools",
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
"log",
|
"log",
|
||||||
@@ -1494,9 +1478,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "smallvec"
|
name = "smallvec"
|
||||||
version = "1.13.2"
|
version = "1.11.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
@@ -1757,13 +1741,19 @@ version = "0.19.14"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
|
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"indexmap",
|
"indexmap 2.0.0",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_spanned",
|
"serde_spanned",
|
||||||
"toml_datetime",
|
"toml_datetime",
|
||||||
"winnow",
|
"winnow",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-service"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing"
|
name = "tracing"
|
||||||
version = "0.1.37"
|
version = "0.1.37"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.2.0"
|
version = "1.1.2-dev4"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@@ -29,9 +29,7 @@ base64 = "0.21"
|
|||||||
stringprep = "0.1"
|
stringprep = "0.1"
|
||||||
tokio-rustls = "0.24"
|
tokio-rustls = "0.24"
|
||||||
rustls-pemfile = "1"
|
rustls-pemfile = "1"
|
||||||
http-body-util = "0.1.2"
|
hyper = { version = "0.14", features = ["full"] }
|
||||||
hyper = { version = "1.4.1", features = ["full"] }
|
|
||||||
hyper-util = { version = "0.1.7", features = ["tokio"] }
|
|
||||||
phf = { version = "0.11.1", features = ["macros"] }
|
phf = { version = "0.11.1", features = ["macros"] }
|
||||||
exitcode = "1.1.2"
|
exitcode = "1.1.2"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
@@ -46,7 +44,7 @@ rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
|||||||
trust-dns-resolver = "0.22.0"
|
trust-dns-resolver = "0.22.0"
|
||||||
tokio-test = "0.4.2"
|
tokio-test = "0.4.2"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
itertools = "0.10"
|
itertools = "0.13"
|
||||||
clap = { version = "4.3.1", features = ["derive", "env"] }
|
clap = { version = "4.3.1", features = ["derive", "env"] }
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.17", features = [
|
tracing-subscriber = { version = "0.3.17", features = [
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM rust:1.79.0-slim-bookworm AS builder
|
FROM rust:1-slim-bookworm AS builder
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y build-essential
|
apt-get install -y build-essential
|
||||||
@@ -19,4 +19,3 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
|
|||||||
WORKDIR /etc/pgcat
|
WORKDIR /etc/pgcat
|
||||||
ENV RUST_LOG=info
|
ENV RUST_LOG=info
|
||||||
CMD ["pgcat"]
|
CMD ["pgcat"]
|
||||||
STOPSIGNAL SIGINT
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM cimg/rust:1.79.0
|
FROM cimg/rust:1.67.1
|
||||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||||
RUN /bin/yj -h
|
RUN /bin/yj -h
|
||||||
RUN sudo apt-get update && \
|
RUN sudo apt-get update && \
|
||||||
|
|||||||
@@ -268,8 +268,6 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
|
|||||||
|
|
||||||
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
||||||
|
|
||||||
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
|
|
||||||
|
|
||||||
### Live configuration reloading
|
### Live configuration reloading
|
||||||
|
|
||||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
||||||
|
|||||||
@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
|
|||||||
maintainers:
|
maintainers:
|
||||||
- name: Wildcard
|
- name: Wildcard
|
||||||
email: support@w6d.io
|
email: support@w6d.io
|
||||||
appVersion: "1.2.0"
|
appVersion: "1.1.1"
|
||||||
version: 0.2.0
|
version: 0.1.0
|
||||||
|
|||||||
@@ -240,15 +240,7 @@ configuration:
|
|||||||
## the pool_name is what clients use as database name when connecting
|
## the pool_name is what clients use as database name when connecting
|
||||||
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
|
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
|
||||||
## @param [object]
|
## @param [object]
|
||||||
pools:
|
pools: []
|
||||||
[{
|
|
||||||
name: "simple", pool_mode: "transaction",
|
|
||||||
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
|
|
||||||
shards: [{
|
|
||||||
servers: [{host: "postgres", port: 5432, role: "primary"}],
|
|
||||||
database: "postgres"
|
|
||||||
}]
|
|
||||||
}]
|
|
||||||
# - ## default values
|
# - ## default values
|
||||||
# ##
|
# ##
|
||||||
# ##
|
# ##
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -11,7 +11,6 @@ RestartSec=1
|
|||||||
Environment=RUST_LOG=info
|
Environment=RUST_LOG=info
|
||||||
LimitNOFILE=65536
|
LimitNOFILE=65536
|
||||||
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
||||||
ExecReload=/bin/kill -SIGHUP $MAINPID
|
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
|
|||||||
14
src/admin.rs
14
src/admin.rs
@@ -55,12 +55,7 @@ where
|
|||||||
|
|
||||||
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
||||||
|
|
||||||
match query_parts
|
match query_parts[0].to_ascii_uppercase().as_str() {
|
||||||
.first()
|
|
||||||
.unwrap_or(&"")
|
|
||||||
.to_ascii_uppercase()
|
|
||||||
.as_str()
|
|
||||||
{
|
|
||||||
"BAN" => {
|
"BAN" => {
|
||||||
trace!("BAN");
|
trace!("BAN");
|
||||||
ban(stream, query_parts).await
|
ban(stream, query_parts).await
|
||||||
@@ -89,12 +84,7 @@ where
|
|||||||
trace!("SHUTDOWN");
|
trace!("SHUTDOWN");
|
||||||
shutdown(stream).await
|
shutdown(stream).await
|
||||||
}
|
}
|
||||||
"SHOW" => match query_parts
|
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
|
||||||
.get(1)
|
|
||||||
.unwrap_or(&"")
|
|
||||||
.to_ascii_uppercase()
|
|
||||||
.as_str()
|
|
||||||
{
|
|
||||||
"HELP" => {
|
"HELP" => {
|
||||||
trace!("SHOW HELP");
|
trace!("SHOW HELP");
|
||||||
show_help(stream).await
|
show_help(stream).await
|
||||||
|
|||||||
@@ -1729,13 +1729,14 @@ where
|
|||||||
/// and also the pool's statement cache. Add it to extended protocol data.
|
/// and also the pool's statement cache. Add it to extended protocol data.
|
||||||
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
let client_given_name = Parse::get_name(&message)?;
|
if !self.prepared_statements_enabled {
|
||||||
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
|
||||||
debug!("Anonymous parse message");
|
debug!("Anonymous parse message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let client_given_name = Parse::get_name(&message)?;
|
||||||
let parse: Parse = (&message).try_into()?;
|
let parse: Parse = (&message).try_into()?;
|
||||||
|
|
||||||
// Compute the hash of the parse statement
|
// Compute the hash of the parse statement
|
||||||
@@ -1773,14 +1774,15 @@ where
|
|||||||
/// saved in the client cache.
|
/// saved in the client cache.
|
||||||
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
let client_given_name = Bind::get_name(&message)?;
|
if !self.prepared_statements_enabled {
|
||||||
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
|
||||||
debug!("Anonymous bind message");
|
debug!("Anonymous bind message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let client_given_name = Bind::get_name(&message)?;
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
Some((rewritten_parse, _)) => {
|
Some((rewritten_parse, _)) => {
|
||||||
let message = Bind::rename(message, &rewritten_parse.name)?;
|
let message = Bind::rename(message, &rewritten_parse.name)?;
|
||||||
@@ -1832,8 +1834,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let describe: Describe = (&message).try_into()?;
|
let describe: Describe = (&message).try_into()?;
|
||||||
let client_given_name = describe.statement_name.clone();
|
if describe.target == 'P' {
|
||||||
if describe.target == 'P' || client_given_name.is_empty() {
|
|
||||||
debug!("Portal describe message");
|
debug!("Portal describe message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||||
@@ -1841,6 +1842,8 @@ where
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let client_given_name = describe.statement_name.clone();
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
Some((rewritten_parse, _)) => {
|
Some((rewritten_parse, _)) => {
|
||||||
let describe = describe.rename(&rewritten_parse.name);
|
let describe = describe.rename(&rewritten_parse.name);
|
||||||
|
|||||||
@@ -38,12 +38,12 @@ pub enum Role {
|
|||||||
Mirror,
|
Mirror,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Role {
|
impl ToString for Role {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn to_string(&self) -> String {
|
||||||
match self {
|
match *self {
|
||||||
Role::Primary => write!(f, "primary"),
|
Role::Primary => "primary".to_string(),
|
||||||
Role::Replica => write!(f, "replica"),
|
Role::Replica => "replica".to_string(),
|
||||||
Role::Mirror => write!(f, "mirror"),
|
Role::Mirror => "mirror".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -476,11 +476,11 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
impl ToString for PoolMode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn to_string(&self) -> String {
|
||||||
match self {
|
match *self {
|
||||||
PoolMode::Transaction => write!(f, "transaction"),
|
PoolMode::Transaction => "transaction".to_string(),
|
||||||
PoolMode::Session => write!(f, "session"),
|
PoolMode::Session => "session".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -493,13 +493,12 @@ pub enum LoadBalancingMode {
|
|||||||
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
||||||
LeastOutstandingConnections,
|
LeastOutstandingConnections,
|
||||||
}
|
}
|
||||||
|
impl ToString for LoadBalancingMode {
|
||||||
impl std::fmt::Display for LoadBalancingMode {
|
fn to_string(&self) -> String {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
match *self {
|
||||||
match self {
|
LoadBalancingMode::Random => "random".to_string(),
|
||||||
LoadBalancingMode::Random => write!(f, "random"),
|
|
||||||
LoadBalancingMode::LeastOutstandingConnections => {
|
LoadBalancingMode::LeastOutstandingConnections => {
|
||||||
write!(f, "least_outstanding_connections")
|
"least_outstanding_connections".to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1000,17 +999,15 @@ impl Config {
|
|||||||
pub fn fill_up_auth_query_config(&mut self) {
|
pub fn fill_up_auth_query_config(&mut self) {
|
||||||
for (_name, pool) in self.pools.iter_mut() {
|
for (_name, pool) in self.pools.iter_mut() {
|
||||||
if pool.auth_query.is_none() {
|
if pool.auth_query.is_none() {
|
||||||
pool.auth_query.clone_from(&self.general.auth_query);
|
pool.auth_query = self.general.auth_query.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
if pool.auth_query_user.is_none() {
|
if pool.auth_query_user.is_none() {
|
||||||
pool.auth_query_user
|
pool.auth_query_user = self.general.auth_query_user.clone();
|
||||||
.clone_from(&self.general.auth_query_user);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if pool.auth_query_password.is_none() {
|
if pool.auth_query_password.is_none() {
|
||||||
pool.auth_query_password
|
pool.auth_query_password = self.general.auth_query_password.clone();
|
||||||
.clone_from(&self.general.auth_query_password);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1158,7 +1155,7 @@ impl Config {
|
|||||||
"Default max server lifetime: {}ms",
|
"Default max server lifetime: {}ms",
|
||||||
self.general.server_lifetime
|
self.general.server_lifetime
|
||||||
);
|
);
|
||||||
info!("Server round robin: {}", self.general.server_round_robin);
|
info!("Sever round robin: {}", self.general.server_round_robin);
|
||||||
match self.general.tls_certificate.clone() {
|
match self.general.tls_certificate.clone() {
|
||||||
Some(tls_certificate) => {
|
Some(tls_certificate) => {
|
||||||
info!("TLS certificate: {}", tls_certificate);
|
info!("TLS certificate: {}", tls_certificate);
|
||||||
|
|||||||
@@ -733,10 +733,6 @@ pub fn configure_socket(stream: &TcpStream) {
|
|||||||
}
|
}
|
||||||
Err(err) => error!("Could not configure socket: {}", err),
|
Err(err) => error!("Could not configure socket: {}", err),
|
||||||
}
|
}
|
||||||
match sock_ref.set_nodelay(true) {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait BytesMutReader {
|
pub trait BytesMutReader {
|
||||||
@@ -821,10 +817,10 @@ impl ExtendedProtocolData {
|
|||||||
pub struct Parse {
|
pub struct Parse {
|
||||||
code: char,
|
code: char,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: u32,
|
len: i32,
|
||||||
pub name: String,
|
pub name: String,
|
||||||
query: String,
|
query: String,
|
||||||
num_params: u16,
|
num_params: i16,
|
||||||
param_types: Vec<i32>,
|
param_types: Vec<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -834,11 +830,12 @@ impl TryFrom<&BytesMut> for Parse {
|
|||||||
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
|
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_u32();
|
let len = cursor.get_i32();
|
||||||
let name = cursor.read_string()?;
|
let name = cursor.read_string()?;
|
||||||
let query = cursor.read_string()?;
|
let query = cursor.read_string()?;
|
||||||
let num_params = cursor.get_u16();
|
let num_params = cursor.get_i16();
|
||||||
let mut param_types = Vec::new();
|
let mut param_types = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_params {
|
for _ in 0..num_params {
|
||||||
param_types.push(cursor.get_i32());
|
param_types.push(cursor.get_i32());
|
||||||
}
|
}
|
||||||
@@ -874,10 +871,10 @@ impl TryFrom<Parse> for BytesMut {
|
|||||||
+ 4 * parse.num_params as usize;
|
+ 4 * parse.num_params as usize;
|
||||||
|
|
||||||
bytes.put_u8(parse.code as u8);
|
bytes.put_u8(parse.code as u8);
|
||||||
bytes.put_u32(len as u32);
|
bytes.put_i32(len as i32);
|
||||||
bytes.put_slice(name);
|
bytes.put_slice(name);
|
||||||
bytes.put_slice(query);
|
bytes.put_slice(query);
|
||||||
bytes.put_u16(parse.num_params);
|
bytes.put_i16(parse.num_params);
|
||||||
for param in parse.param_types {
|
for param in parse.param_types {
|
||||||
bytes.put_i32(param);
|
bytes.put_i32(param);
|
||||||
}
|
}
|
||||||
@@ -944,14 +941,14 @@ impl Parse {
|
|||||||
pub struct Bind {
|
pub struct Bind {
|
||||||
code: char,
|
code: char,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: u64,
|
len: i64,
|
||||||
portal: String,
|
portal: String,
|
||||||
pub prepared_statement: String,
|
pub prepared_statement: String,
|
||||||
num_param_format_codes: u16,
|
num_param_format_codes: i16,
|
||||||
param_format_codes: Vec<i16>,
|
param_format_codes: Vec<i16>,
|
||||||
num_param_values: u16,
|
num_param_values: i16,
|
||||||
param_values: Vec<(i32, BytesMut)>,
|
param_values: Vec<(i32, BytesMut)>,
|
||||||
num_result_column_format_codes: u16,
|
num_result_column_format_codes: i16,
|
||||||
result_columns_format_codes: Vec<i16>,
|
result_columns_format_codes: Vec<i16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -961,17 +958,17 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
|
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_u32();
|
let len = cursor.get_i32();
|
||||||
let portal = cursor.read_string()?;
|
let portal = cursor.read_string()?;
|
||||||
let prepared_statement = cursor.read_string()?;
|
let prepared_statement = cursor.read_string()?;
|
||||||
let num_param_format_codes = cursor.get_u16();
|
let num_param_format_codes = cursor.get_i16();
|
||||||
let mut param_format_codes = Vec::new();
|
let mut param_format_codes = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_param_format_codes {
|
for _ in 0..num_param_format_codes {
|
||||||
param_format_codes.push(cursor.get_i16());
|
param_format_codes.push(cursor.get_i16());
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_param_values = cursor.get_u16();
|
let num_param_values = cursor.get_i16();
|
||||||
let mut param_values = Vec::new();
|
let mut param_values = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_param_values {
|
for _ in 0..num_param_values {
|
||||||
@@ -993,7 +990,7 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_result_column_format_codes = cursor.get_u16();
|
let num_result_column_format_codes = cursor.get_i16();
|
||||||
let mut result_columns_format_codes = Vec::new();
|
let mut result_columns_format_codes = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_result_column_format_codes {
|
for _ in 0..num_result_column_format_codes {
|
||||||
@@ -1002,7 +999,7 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
|
|
||||||
Ok(Bind {
|
Ok(Bind {
|
||||||
code,
|
code,
|
||||||
len: len as u64,
|
len: len as i64,
|
||||||
portal,
|
portal,
|
||||||
prepared_statement,
|
prepared_statement,
|
||||||
num_param_format_codes,
|
num_param_format_codes,
|
||||||
@@ -1041,19 +1038,19 @@ impl TryFrom<Bind> for BytesMut {
|
|||||||
len += 2 * bind.num_result_column_format_codes as usize;
|
len += 2 * bind.num_result_column_format_codes as usize;
|
||||||
|
|
||||||
bytes.put_u8(bind.code as u8);
|
bytes.put_u8(bind.code as u8);
|
||||||
bytes.put_u32(len as u32);
|
bytes.put_i32(len as i32);
|
||||||
bytes.put_slice(portal);
|
bytes.put_slice(portal);
|
||||||
bytes.put_slice(prepared_statement);
|
bytes.put_slice(prepared_statement);
|
||||||
bytes.put_u16(bind.num_param_format_codes);
|
bytes.put_i16(bind.num_param_format_codes);
|
||||||
for param_format_code in bind.param_format_codes {
|
for param_format_code in bind.param_format_codes {
|
||||||
bytes.put_i16(param_format_code);
|
bytes.put_i16(param_format_code);
|
||||||
}
|
}
|
||||||
bytes.put_u16(bind.num_param_values);
|
bytes.put_i16(bind.num_param_values);
|
||||||
for (param_len, param) in bind.param_values {
|
for (param_len, param) in bind.param_values {
|
||||||
bytes.put_i32(param_len);
|
bytes.put_i32(param_len);
|
||||||
bytes.put_slice(¶m);
|
bytes.put_slice(¶m);
|
||||||
}
|
}
|
||||||
bytes.put_u16(bind.num_result_column_format_codes);
|
bytes.put_i16(bind.num_result_column_format_codes);
|
||||||
for result_column_format_code in bind.result_columns_format_codes {
|
for result_column_format_code in bind.result_columns_format_codes {
|
||||||
bytes.put_i16(result_column_format_code);
|
bytes.put_i16(result_column_format_code);
|
||||||
}
|
}
|
||||||
@@ -1067,7 +1064,7 @@ impl Bind {
|
|||||||
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
|
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
// Skip the code and length
|
// Skip the code and length
|
||||||
cursor.advance(mem::size_of::<u8>() + mem::size_of::<u32>());
|
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
|
||||||
cursor.read_string()?;
|
cursor.read_string()?;
|
||||||
cursor.read_string()
|
cursor.read_string()
|
||||||
}
|
}
|
||||||
@@ -1077,17 +1074,17 @@ impl Bind {
|
|||||||
let mut cursor = Cursor::new(&buf);
|
let mut cursor = Cursor::new(&buf);
|
||||||
// Read basic data from the cursor
|
// Read basic data from the cursor
|
||||||
let code = cursor.get_u8();
|
let code = cursor.get_u8();
|
||||||
let current_len = cursor.get_u32();
|
let current_len = cursor.get_i32();
|
||||||
let portal = cursor.read_string()?;
|
let portal = cursor.read_string()?;
|
||||||
let prepared_statement = cursor.read_string()?;
|
let prepared_statement = cursor.read_string()?;
|
||||||
|
|
||||||
// Calculate new length
|
// Calculate new length
|
||||||
let new_len = current_len + new_name.len() as u32 - prepared_statement.len() as u32;
|
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
|
||||||
|
|
||||||
// Begin building the response buffer
|
// Begin building the response buffer
|
||||||
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
|
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
|
||||||
response_buf.put_u8(code);
|
response_buf.put_u8(code);
|
||||||
response_buf.put_u32(new_len);
|
response_buf.put_i32(new_len);
|
||||||
|
|
||||||
// Put the portal and new name into the buffer
|
// Put the portal and new name into the buffer
|
||||||
// Note: panic if the provided string contains null byte
|
// Note: panic if the provided string contains null byte
|
||||||
@@ -1111,7 +1108,7 @@ pub struct Describe {
|
|||||||
code: char,
|
code: char,
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: u32,
|
len: i32,
|
||||||
pub target: char,
|
pub target: char,
|
||||||
pub statement_name: String,
|
pub statement_name: String,
|
||||||
}
|
}
|
||||||
@@ -1122,7 +1119,7 @@ impl TryFrom<&BytesMut> for Describe {
|
|||||||
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
|
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
|
||||||
let mut cursor = Cursor::new(bytes);
|
let mut cursor = Cursor::new(bytes);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_u32();
|
let len = cursor.get_i32();
|
||||||
let target = cursor.get_u8() as char;
|
let target = cursor.get_u8() as char;
|
||||||
let statement_name = cursor.read_string()?;
|
let statement_name = cursor.read_string()?;
|
||||||
|
|
||||||
@@ -1145,7 +1142,7 @@ impl TryFrom<Describe> for BytesMut {
|
|||||||
let len = 4 + 1 + statement_name.len();
|
let len = 4 + 1 + statement_name.len();
|
||||||
|
|
||||||
bytes.put_u8(describe.code as u8);
|
bytes.put_u8(describe.code as u8);
|
||||||
bytes.put_u32(len as u32);
|
bytes.put_i32(len as i32);
|
||||||
bytes.put_u8(describe.target as u8);
|
bytes.put_u8(describe.target as u8);
|
||||||
bytes.put_slice(statement_name);
|
bytes.put_slice(statement_name);
|
||||||
|
|
||||||
|
|||||||
@@ -1,41 +1,23 @@
|
|||||||
use http_body_util::Full;
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::body;
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use hyper::body::Bytes;
|
|
||||||
|
|
||||||
use hyper::server::conn::http1;
|
|
||||||
use hyper::service::service_fn;
|
|
||||||
use hyper::{Method, Request, Response, StatusCode};
|
|
||||||
use hyper_util::rt::TokioIo;
|
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use phf::phf_map;
|
use phf::phf_map;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use tokio::net::TcpListener;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::config::Address;
|
use crate::config::Address;
|
||||||
use crate::pool::{get_all_pools, PoolIdentifier};
|
use crate::pool::{get_all_pools, PoolIdentifier};
|
||||||
use crate::stats::get_server_stats;
|
|
||||||
use crate::stats::pool::PoolStats;
|
use crate::stats::pool::PoolStats;
|
||||||
|
use crate::stats::{get_server_stats, ServerStats};
|
||||||
|
|
||||||
struct MetricHelpType {
|
struct MetricHelpType {
|
||||||
help: &'static str,
|
help: &'static str,
|
||||||
ty: &'static str,
|
ty: &'static str,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ServerPrometheusStats {
|
|
||||||
bytes_received: u64,
|
|
||||||
bytes_sent: u64,
|
|
||||||
transaction_count: u64,
|
|
||||||
query_count: u64,
|
|
||||||
error_count: u64,
|
|
||||||
active_count: u64,
|
|
||||||
idle_count: u64,
|
|
||||||
login_count: u64,
|
|
||||||
tested_count: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
||||||
// counters only increase
|
// counters only increase
|
||||||
// gauges can arbitrarily increase or decrease
|
// gauges can arbitrarily increase or decrease
|
||||||
@@ -138,46 +120,22 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
|
|||||||
},
|
},
|
||||||
"servers_bytes_received" => MetricHelpType {
|
"servers_bytes_received" => MetricHelpType {
|
||||||
help: "Volume in bytes of network traffic received by server",
|
help: "Volume in bytes of network traffic received by server",
|
||||||
ty: "counter",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"servers_bytes_sent" => MetricHelpType {
|
"servers_bytes_sent" => MetricHelpType {
|
||||||
help: "Volume in bytes of network traffic sent by server",
|
help: "Volume in bytes of network traffic sent by server",
|
||||||
ty: "counter",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"servers_transaction_count" => MetricHelpType {
|
"servers_transaction_count" => MetricHelpType {
|
||||||
help: "Number of transactions executed by server",
|
help: "Number of transactions executed by server",
|
||||||
ty: "counter",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"servers_query_count" => MetricHelpType {
|
"servers_query_count" => MetricHelpType {
|
||||||
help: "Number of queries executed by server",
|
help: "Number of queries executed by server",
|
||||||
ty: "counter",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"servers_error_count" => MetricHelpType {
|
"servers_error_count" => MetricHelpType {
|
||||||
help: "Number of errors",
|
help: "Number of errors",
|
||||||
ty: "counter",
|
|
||||||
},
|
|
||||||
"servers_idle_count" => MetricHelpType {
|
|
||||||
help: "Number of server connection in idle state",
|
|
||||||
ty: "gauge",
|
|
||||||
},
|
|
||||||
"servers_active_count" => MetricHelpType {
|
|
||||||
help: "Number of server connection in active state",
|
|
||||||
ty: "gauge",
|
|
||||||
},
|
|
||||||
"servers_tested_count" => MetricHelpType {
|
|
||||||
help: "Number of server connection in tested state",
|
|
||||||
ty: "gauge",
|
|
||||||
},
|
|
||||||
"servers_login_count" => MetricHelpType {
|
|
||||||
help: "Number of server connection in login state",
|
|
||||||
ty: "gauge",
|
|
||||||
},
|
|
||||||
"servers_is_banned" => MetricHelpType {
|
|
||||||
help: "0 if server is not banned, 1 if server is banned",
|
|
||||||
ty: "gauge",
|
|
||||||
},
|
|
||||||
"servers_is_paused" => MetricHelpType {
|
|
||||||
help: "0 if server is not paused, 1 if server is paused",
|
|
||||||
ty: "gauge",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"databases_pool_size" => MetricHelpType {
|
"databases_pool_size" => MetricHelpType {
|
||||||
@@ -245,9 +203,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
|
||||||
|
|
||||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -262,9 +218,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
|
||||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -274,9 +229,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("index", address.address_index.to_string());
|
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
|
||||||
|
|
||||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -290,9 +243,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prometheus_stats(
|
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
|
||||||
request: Request<body::Incoming>,
|
|
||||||
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
|
|
||||||
match (request.method(), request.uri().path()) {
|
match (request.method(), request.uri().path()) {
|
||||||
(&Method::GET, "/metrics") => {
|
(&Method::GET, "/metrics") => {
|
||||||
let mut lines = Vec::new();
|
let mut lines = Vec::new();
|
||||||
@@ -378,51 +329,34 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||||
fn push_server_stats(lines: &mut Vec<String>) {
|
fn push_server_stats(lines: &mut Vec<String>) {
|
||||||
let server_stats = get_server_stats();
|
let server_stats = get_server_stats();
|
||||||
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
|
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
|
||||||
for (_, stats) in server_stats {
|
for (_, stats) in server_stats {
|
||||||
let entry = prom_stats
|
server_stats_by_addresses.insert(stats.address_name(), stats);
|
||||||
.entry(stats.address_name())
|
|
||||||
.or_insert(ServerPrometheusStats {
|
|
||||||
bytes_received: 0,
|
|
||||||
bytes_sent: 0,
|
|
||||||
transaction_count: 0,
|
|
||||||
query_count: 0,
|
|
||||||
error_count: 0,
|
|
||||||
active_count: 0,
|
|
||||||
idle_count: 0,
|
|
||||||
login_count: 0,
|
|
||||||
tested_count: 0,
|
|
||||||
});
|
|
||||||
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
|
|
||||||
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
|
|
||||||
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
|
|
||||||
entry.query_count += stats.query_count.load(Ordering::Relaxed);
|
|
||||||
entry.error_count += stats.error_count.load(Ordering::Relaxed);
|
|
||||||
match stats.state.load(Ordering::Relaxed) {
|
|
||||||
crate::stats::ServerState::Login => entry.login_count += 1,
|
|
||||||
crate::stats::ServerState::Active => entry.active_count += 1,
|
|
||||||
crate::stats::ServerState::Tested => entry.tested_count += 1,
|
|
||||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
let address = pool.address(shard, server);
|
let address = pool.address(shard, server);
|
||||||
if let Some(server_info) = prom_stats.get(&address.name()) {
|
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
|
||||||
let metrics = [
|
let metrics = [
|
||||||
("bytes_received", server_info.bytes_received),
|
(
|
||||||
("bytes_sent", server_info.bytes_sent),
|
"bytes_received",
|
||||||
("transaction_count", server_info.transaction_count),
|
server_info.bytes_received.load(Ordering::Relaxed),
|
||||||
("query_count", server_info.query_count),
|
),
|
||||||
("error_count", server_info.error_count),
|
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
|
||||||
("idle_count", server_info.idle_count),
|
(
|
||||||
("active_count", server_info.active_count),
|
"transaction_count",
|
||||||
("login_count", server_info.login_count),
|
server_info.transaction_count.load(Ordering::Relaxed),
|
||||||
("tested_count", server_info.tested_count),
|
),
|
||||||
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
|
(
|
||||||
("is_paused", if pool.paused() { 1 } else { 0 }),
|
"query_count",
|
||||||
|
server_info.query_count.load(Ordering::Relaxed),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"error_count",
|
||||||
|
server_info.error_count.load(Ordering::Relaxed),
|
||||||
|
),
|
||||||
];
|
];
|
||||||
for (key, value) in metrics {
|
for (key, value) in metrics {
|
||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
@@ -440,35 +374,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||||
let listener = TcpListener::bind(http_addr);
|
let http_service_factory =
|
||||||
let listener = match listener.await {
|
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
|
||||||
Ok(listener) => listener,
|
let server = Server::bind(&http_addr).serve(http_service_factory);
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to bind prometheus server to HTTP address: {}.", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
info!(
|
info!(
|
||||||
"Exposing prometheus metrics on http://{}/metrics.",
|
"Exposing prometheus metrics on http://{}/metrics.",
|
||||||
http_addr
|
http_addr
|
||||||
);
|
);
|
||||||
loop {
|
if let Err(e) = server.await {
|
||||||
let stream = match listener.accept().await {
|
error!("Failed to run HTTP server: {}.", e);
|
||||||
Ok((stream, _)) => stream,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error accepting connection: {}", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let io = TokioIo::new(stream);
|
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
if let Err(err) = http1::Builder::new()
|
|
||||||
.serve_connection(io, service_fn(prometheus_stats))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -427,12 +427,8 @@ impl QueryRouter {
|
|||||||
None => (),
|
None => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
let has_locks = !query.locks.is_empty();
|
|
||||||
|
|
||||||
if has_locks {
|
|
||||||
self.active_role = Some(Role::Primary);
|
|
||||||
} else if !visited_write_statement {
|
|
||||||
// If we already visited a write statement, we should be going to the primary.
|
// If we already visited a write statement, we should be going to the primary.
|
||||||
|
if !visited_write_statement {
|
||||||
self.active_role = match self.primary_reads_enabled() {
|
self.active_role = match self.primary_reads_enabled() {
|
||||||
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
|
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
|
||||||
true => None, // Any server role is fine in this case.
|
true => None, // Any server role is fine in this case.
|
||||||
@@ -1162,29 +1158,6 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_select_for_update() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
|
||||||
|
|
||||||
let queries_in_primary_role = vec![
|
|
||||||
simple_query("BEGIN"), // Transaction start
|
|
||||||
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
|
|
||||||
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
|
|
||||||
];
|
|
||||||
|
|
||||||
for query in queries_in_primary_role {
|
|
||||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
|
||||||
}
|
|
||||||
|
|
||||||
// query without lock do not change role
|
|
||||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
|
||||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
|
||||||
assert_eq!(qr.role(), None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_infer_primary_reads_enabled() {
|
fn test_infer_primary_reads_enabled() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
@@ -1399,19 +1372,6 @@ mod test {
|
|||||||
assert!(!qr.query_parser_enabled());
|
assert!(!qr.query_parser_enabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_query_parser() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
|
||||||
|
|
||||||
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
|
|
||||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
|
||||||
|
|
||||||
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
|
|
||||||
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_update_from_pool_settings() {
|
fn test_update_from_pool_settings() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
|||||||
@@ -698,6 +698,7 @@ impl Server {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!("Error: {}", error_code);
|
trace!("Error: {}", error_code);
|
||||||
|
|
||||||
match error_code {
|
match error_code {
|
||||||
@@ -1012,12 +1013,6 @@ impl Server {
|
|||||||
// which can leak between clients. This is a best effort to block bad clients
|
// which can leak between clients. This is a best effort to block bad clients
|
||||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||||
match command.as_str() {
|
match command.as_str() {
|
||||||
"DISCARD ALL" => {
|
|
||||||
self.clear_prepared_statement_cache();
|
|
||||||
}
|
|
||||||
"DEALLOCATE ALL" => {
|
|
||||||
self.clear_prepared_statement_cache();
|
|
||||||
}
|
|
||||||
"SET" => {
|
"SET" => {
|
||||||
// We don't detect set statements in transactions
|
// We don't detect set statements in transactions
|
||||||
// No great way to differentiate between set and set local
|
// No great way to differentiate between set and set local
|
||||||
@@ -1137,12 +1132,6 @@ impl Server {
|
|||||||
has_it
|
has_it
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_prepared_statement_cache(&mut self) {
|
|
||||||
if let Some(cache) = &mut self.prepared_statement_cache {
|
|
||||||
cache.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||||
let cache = match &mut self.prepared_statement_cache {
|
let cache = match &mut self.prepared_statement_cache {
|
||||||
Some(cache) => cache,
|
Some(cache) => cache,
|
||||||
|
|||||||
@@ -14,11 +14,11 @@ pub enum ShardingFunction {
|
|||||||
Sha1,
|
Sha1,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for ShardingFunction {
|
impl ToString for ShardingFunction {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn to_string(&self) -> String {
|
||||||
match self {
|
match *self {
|
||||||
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
|
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
|
||||||
ShardingFunction::Sha1 => write!(f, "sha1"),
|
ShardingFunction::Sha1 => "sha1".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,34 +0,0 @@
|
|||||||
GREEN="\033[0;32m"
|
|
||||||
RED="\033[0;31m"
|
|
||||||
BLUE="\033[0;34m"
|
|
||||||
RESET="\033[0m"
|
|
||||||
|
|
||||||
|
|
||||||
cd tests/docker/
|
|
||||||
docker compose kill main || true
|
|
||||||
docker compose build main
|
|
||||||
docker compose down
|
|
||||||
docker compose up -d
|
|
||||||
# wait for the container to start
|
|
||||||
while ! docker compose exec main ls; do
|
|
||||||
echo "Waiting for test environment to start"
|
|
||||||
sleep 1
|
|
||||||
done
|
|
||||||
echo "==================================="
|
|
||||||
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
|
||||||
docker compose exec --workdir /app main cargo build
|
|
||||||
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
|
||||||
docker compose exec --workdir /app/tests/ruby main bundle install
|
|
||||||
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
|
||||||
echo "Interactive test environment ready"
|
|
||||||
echo "To run integration tests, you can use the following commands:"
|
|
||||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
|
||||||
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
|
||||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
|
||||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
|
||||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
|
||||||
echo "You can rebuild PgCat from within the container by running"
|
|
||||||
echo -e " ${GREEN}cargo build${RESET}"
|
|
||||||
echo "and then run the tests again"
|
|
||||||
echo "==================================="
|
|
||||||
docker compose exec --workdir /app/tests main bash
|
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
version: "3"
|
||||||
services:
|
services:
|
||||||
pg1:
|
pg1:
|
||||||
image: postgres:14
|
image: postgres:14
|
||||||
@@ -47,8 +48,6 @@ services:
|
|||||||
main:
|
main:
|
||||||
build: .
|
build: .
|
||||||
command: ["bash", "/app/tests/docker/run.sh"]
|
command: ["bash", "/app/tests/docker/run.sh"]
|
||||||
environment:
|
|
||||||
- INTERACTIVE_TEST_ENVIRONMENT=true
|
|
||||||
volumes:
|
volumes:
|
||||||
- ../../:/app/
|
- ../../:/app/
|
||||||
- /app/target/
|
- /app/target/
|
||||||
|
|||||||
@@ -5,38 +5,6 @@ rm /app/*.profraw || true
|
|||||||
rm /app/pgcat.profdata || true
|
rm /app/pgcat.profdata || true
|
||||||
rm -rf /app/cov || true
|
rm -rf /app/cov || true
|
||||||
|
|
||||||
# Prepares the interactive test environment
|
|
||||||
#
|
|
||||||
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
|
||||||
ports=(5432 7432 8432 9432 10432)
|
|
||||||
for port in "${ports[@]}"; do
|
|
||||||
is_it_up=0
|
|
||||||
attempts=0
|
|
||||||
while [ $is_it_up -eq 0 ]; do
|
|
||||||
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
|
||||||
if [ $? -eq 0 ]; then
|
|
||||||
echo "PostgreSQL on port $port is up."
|
|
||||||
is_it_up=1
|
|
||||||
else
|
|
||||||
attempts=$((attempts+1))
|
|
||||||
if [ $attempts -gt 10 ]; then
|
|
||||||
echo "PostgreSQL on port $port is down, giving up."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "PostgreSQL on port $port is down, waiting for it to start."
|
|
||||||
sleep 1
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
done
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
sleep 100000000000000000
|
|
||||||
exit 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||||
export RUSTC_BOOTSTRAP=1
|
export RUSTC_BOOTSTRAP=1
|
||||||
export CARGO_INCREMENTAL=0
|
export CARGO_INCREMENTAL=0
|
||||||
|
|||||||
@@ -91,27 +91,6 @@ describe "Admin" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
[
|
|
||||||
"SHOW ME THE MONEY",
|
|
||||||
"SHOW ME THE WAY",
|
|
||||||
"SHOW UP",
|
|
||||||
"SHOWTIME",
|
|
||||||
"HAMMER TIME",
|
|
||||||
"SHOWN TO BE TRUE",
|
|
||||||
"SHOW ",
|
|
||||||
"SHOW ",
|
|
||||||
"SHOW 1",
|
|
||||||
";;;;;"
|
|
||||||
].each do |cmd|
|
|
||||||
describe "Bad command #{cmd}" do
|
|
||||||
it "does not panic and responds with PG::SystemError" do
|
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
|
|
||||||
admin_conn.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "PAUSE" do
|
describe "PAUSE" do
|
||||||
it "pauses all pools" do
|
it "pauses all pools" do
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
|||||||
@@ -1,145 +0,0 @@
|
|||||||
|
|
||||||
class PostgresMessage
|
|
||||||
# Base class for common functionality
|
|
||||||
|
|
||||||
def encode_string(str)
|
|
||||||
"#{str}\0" # Encode a string with a null terminator
|
|
||||||
end
|
|
||||||
|
|
||||||
def encode_int16(value)
|
|
||||||
[value].pack('n') # Encode an Int16
|
|
||||||
end
|
|
||||||
|
|
||||||
def encode_int32(value)
|
|
||||||
[value].pack('N') # Encode an Int32
|
|
||||||
end
|
|
||||||
|
|
||||||
def message_prefix(type, length)
|
|
||||||
"#{type}#{encode_int32(length)}" # Message type and length prefix
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class SimpleQueryMessage < PostgresMessage
|
|
||||||
attr_accessor :query
|
|
||||||
|
|
||||||
def initialize(query = "")
|
|
||||||
@query = query
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
query_bytes = encode_string(@query)
|
|
||||||
length = 4 + query_bytes.size # Length includes 4 bytes for length itself
|
|
||||||
message_prefix('Q', length) + query_bytes
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class ParseMessage < PostgresMessage
|
|
||||||
attr_accessor :statement_name, :query, :parameter_types
|
|
||||||
|
|
||||||
def initialize(statement_name = "", query = "", parameter_types = [])
|
|
||||||
@statement_name = statement_name
|
|
||||||
@query = query
|
|
||||||
@parameter_types = parameter_types
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
statement_name_bytes = encode_string(@statement_name)
|
|
||||||
query_bytes = encode_string(@query)
|
|
||||||
parameter_types_bytes = @parameter_types.pack('N*')
|
|
||||||
|
|
||||||
length = 4 + statement_name_bytes.size + query_bytes.size + 2 + parameter_types_bytes.size
|
|
||||||
message_prefix('P', length) + statement_name_bytes + query_bytes + encode_int16(@parameter_types.size) + parameter_types_bytes
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class BindMessage < PostgresMessage
|
|
||||||
attr_accessor :portal_name, :statement_name, :parameter_format_codes, :parameters, :result_column_format_codes
|
|
||||||
|
|
||||||
def initialize(portal_name = "", statement_name = "", parameter_format_codes = [], parameters = [], result_column_format_codes = [])
|
|
||||||
@portal_name = portal_name
|
|
||||||
@statement_name = statement_name
|
|
||||||
@parameter_format_codes = parameter_format_codes
|
|
||||||
@parameters = parameters
|
|
||||||
@result_column_format_codes = result_column_format_codes
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
portal_name_bytes = encode_string(@portal_name)
|
|
||||||
statement_name_bytes = encode_string(@statement_name)
|
|
||||||
parameter_format_codes_bytes = @parameter_format_codes.pack('n*')
|
|
||||||
|
|
||||||
parameters_bytes = @parameters.map do |param|
|
|
||||||
if param.nil?
|
|
||||||
encode_int32(-1)
|
|
||||||
else
|
|
||||||
encode_int32(param.bytesize) + param
|
|
||||||
end
|
|
||||||
end.join
|
|
||||||
|
|
||||||
result_column_format_codes_bytes = @result_column_format_codes.pack('n*')
|
|
||||||
|
|
||||||
length = 4 + portal_name_bytes.size + statement_name_bytes.size + 2 + parameter_format_codes_bytes.size + 2 + parameters_bytes.size + 2 + result_column_format_codes_bytes.size
|
|
||||||
message_prefix('B', length) + portal_name_bytes + statement_name_bytes + encode_int16(@parameter_format_codes.size) + parameter_format_codes_bytes + encode_int16(@parameters.size) + parameters_bytes + encode_int16(@result_column_format_codes.size) + result_column_format_codes_bytes
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class DescribeMessage < PostgresMessage
|
|
||||||
attr_accessor :type, :name
|
|
||||||
|
|
||||||
def initialize(type = 'S', name = "")
|
|
||||||
@type = type
|
|
||||||
@name = name
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
name_bytes = encode_string(@name)
|
|
||||||
length = 4 + 1 + name_bytes.size
|
|
||||||
message_prefix('D', length) + @type + name_bytes
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
class ExecuteMessage < PostgresMessage
|
|
||||||
attr_accessor :portal_name, :max_rows
|
|
||||||
|
|
||||||
def initialize(portal_name = "", max_rows = 0)
|
|
||||||
@portal_name = portal_name
|
|
||||||
@max_rows = max_rows
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
portal_name_bytes = encode_string(@portal_name)
|
|
||||||
length = 4 + portal_name_bytes.size + 4
|
|
||||||
message_prefix('E', length) + portal_name_bytes + encode_int32(@max_rows)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class FlushMessage < PostgresMessage
|
|
||||||
def to_bytes
|
|
||||||
length = 4
|
|
||||||
message_prefix('H', length)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class SyncMessage < PostgresMessage
|
|
||||||
def to_bytes
|
|
||||||
length = 4
|
|
||||||
message_prefix('S', length)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
class CloseMessage < PostgresMessage
|
|
||||||
attr_accessor :type, :name
|
|
||||||
|
|
||||||
def initialize(type = 'S', name = "")
|
|
||||||
@type = type
|
|
||||||
@name = name
|
|
||||||
end
|
|
||||||
|
|
||||||
def to_bytes
|
|
||||||
name_bytes = encode_string(@name)
|
|
||||||
length = 4 + 1 + name_bytes.size
|
|
||||||
message_prefix('C', length) + @type + name_bytes
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
require 'socket'
|
require 'socket'
|
||||||
require 'digest/md5'
|
require 'digest/md5'
|
||||||
require_relative 'frontend_messages'
|
|
||||||
|
|
||||||
BACKEND_MESSAGE_CODES = {
|
BACKEND_MESSAGE_CODES = {
|
||||||
'Z' => "ReadyForQuery",
|
'Z' => "ReadyForQuery",
|
||||||
@@ -19,11 +18,7 @@ class PostgresSocket
|
|||||||
@host = host
|
@host = host
|
||||||
@socket = TCPSocket.new @host, @port
|
@socket = TCPSocket.new @host, @port
|
||||||
@parameters = {}
|
@parameters = {}
|
||||||
@verbose = false
|
@verbose = true
|
||||||
end
|
|
||||||
|
|
||||||
def send_message(message)
|
|
||||||
@socket.write(message.to_bytes)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def send_md5_password_message(username, password, salt)
|
def send_md5_password_message(username, password, salt)
|
||||||
@@ -118,6 +113,107 @@ class PostgresSocket
|
|||||||
log "[F] Sent CancelRequest message"
|
log "[F] Sent CancelRequest message"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def send_query_message(query)
|
||||||
|
query_size = query.length
|
||||||
|
message_size = 1 + 4 + query_size
|
||||||
|
message = []
|
||||||
|
message << "Q".ord
|
||||||
|
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << query.split('').map(&:ord) # 2, 11
|
||||||
|
message << 0 # 1, 12
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent Q message (#{query})"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_parse_message(query)
|
||||||
|
query_size = query.length
|
||||||
|
message_size = 2 + 2 + 4 + query_size
|
||||||
|
message = []
|
||||||
|
message << "P".ord
|
||||||
|
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << 0 # unnamed statement
|
||||||
|
message << query.split('').map(&:ord) # 2, 11
|
||||||
|
message << 0 # 1, 12
|
||||||
|
message << [0, 0]
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent P message (#{query})"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_bind_message
|
||||||
|
message = []
|
||||||
|
message << "B".ord
|
||||||
|
message << [12].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << 0 # unnamed statement
|
||||||
|
message << 0 # unnamed statement
|
||||||
|
message << [0, 0] # 2
|
||||||
|
message << [0, 0] # 2
|
||||||
|
message << [0, 0] # 2
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent B message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_describe_message(mode)
|
||||||
|
message = []
|
||||||
|
message << "D".ord
|
||||||
|
message << [6].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << mode.ord
|
||||||
|
message << 0 # unnamed statement
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent D message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_execute_message(limit=0)
|
||||||
|
message = []
|
||||||
|
message << "E".ord
|
||||||
|
message << [9].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << 0 # unnamed statement
|
||||||
|
message << [limit].pack('l>').unpack('CCCC') # 4
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent E message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_sync_message
|
||||||
|
message = []
|
||||||
|
message << "S".ord
|
||||||
|
message << [4].pack('l>').unpack('CCCC') # 4
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent S message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_copydone_message
|
||||||
|
message = []
|
||||||
|
message << "c".ord
|
||||||
|
message << [4].pack('l>').unpack('CCCC') # 4
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent c message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_copyfail_message
|
||||||
|
message = []
|
||||||
|
message << "f".ord
|
||||||
|
message << [5].pack('l>').unpack('CCCC') # 4
|
||||||
|
message << 0
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent f message"
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_flush_message
|
||||||
|
message = []
|
||||||
|
message << "H".ord
|
||||||
|
message << [4].pack('l>').unpack('CCCC') # 4
|
||||||
|
message.flatten!
|
||||||
|
@socket.write(message.flatten.pack('C*'))
|
||||||
|
log "[F] Sent H message"
|
||||||
|
end
|
||||||
|
|
||||||
def read_from_server()
|
def read_from_server()
|
||||||
output_messages = []
|
output_messages = []
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
|
|||||||
@@ -16,14 +16,10 @@ describe "Portocol handling" do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def run_comparison(sequence, socket_a, socket_b)
|
def run_comparison(sequence, socket_a, socket_b)
|
||||||
sequence.each do |msg|
|
sequence.each do |msg, *args|
|
||||||
if msg.is_a?(Symbol)
|
socket_a.send(msg, *args)
|
||||||
socket_a.send(msg)
|
socket_b.send(msg, *args)
|
||||||
socket_b.send(msg)
|
|
||||||
else
|
|
||||||
socket_a.send_message(msg)
|
|
||||||
socket_b.send_message(msg)
|
|
||||||
end
|
|
||||||
compare_messages(
|
compare_messages(
|
||||||
socket_a.read_from_server,
|
socket_a.read_from_server,
|
||||||
socket_b.read_from_server
|
socket_b.read_from_server
|
||||||
@@ -88,8 +84,8 @@ describe "Portocol handling" do
|
|||||||
context "Cancel Query" do
|
context "Cancel Query" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
SimpleQueryMessage.new("SELECT pg_sleep(5)"),
|
[:send_query_message, "SELECT pg_sleep(5)"],
|
||||||
:cancel_query
|
[:cancel_query]
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,12 +95,12 @@ describe "Portocol handling" do
|
|||||||
xcontext "Simple query after parse" do
|
xcontext "Simple query after parse" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
ParseMessage.new("", "SELECT 5", []),
|
[:send_parse_message, "SELECT 5"],
|
||||||
SimpleQueryMessage.new("SELECT 1"),
|
[:send_query_message, "SELECT 1"],
|
||||||
BindMessage.new("", "", [], [], [0]),
|
[:send_bind_message],
|
||||||
DescribeMessage.new("P", ""),
|
[:send_describe_message, "P"],
|
||||||
ExecuteMessage.new("", 1),
|
[:send_execute_message],
|
||||||
SyncMessage.new
|
[:send_sync_message],
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,8 +111,8 @@ describe "Portocol handling" do
|
|||||||
xcontext "Flush message" do
|
xcontext "Flush message" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
ParseMessage.new("", "SELECT 1", []),
|
[:send_parse_message, "SELECT 1"],
|
||||||
FlushMessage.new
|
[:send_flush_message]
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,7 +122,9 @@ describe "Portocol handling" do
|
|||||||
|
|
||||||
xcontext "Bind without parse" do
|
xcontext "Bind without parse" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[BindMessage.new("", "", [], [], [0])]
|
[
|
||||||
|
[:send_bind_message]
|
||||||
|
]
|
||||||
}
|
}
|
||||||
# This is known to fail.
|
# This is known to fail.
|
||||||
# Server responds immediately, Proxy buffers the message
|
# Server responds immediately, Proxy buffers the message
|
||||||
@@ -135,155 +133,23 @@ describe "Portocol handling" do
|
|||||||
|
|
||||||
context "Simple message" do
|
context "Simple message" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[SimpleQueryMessage.new("SELECT 1")]
|
[[:send_query_message, "SELECT 1"]]
|
||||||
}
|
}
|
||||||
|
|
||||||
it_behaves_like "at parity with database"
|
it_behaves_like "at parity with database"
|
||||||
end
|
end
|
||||||
|
|
||||||
10.times do |i|
|
|
||||||
context "Extended protocol" do
|
context "Extended protocol" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
ParseMessage.new("", "SELECT 1", []),
|
[:send_parse_message, "SELECT 1"],
|
||||||
BindMessage.new("", "", [], [], [0]),
|
[:send_bind_message],
|
||||||
DescribeMessage.new("S", ""),
|
[:send_describe_message, "P"],
|
||||||
ExecuteMessage.new("", 1),
|
[:send_execute_message],
|
||||||
SyncMessage.new
|
[:send_sync_message],
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
it_behaves_like "at parity with database"
|
it_behaves_like "at parity with database"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Protocol-level prepared statements" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "transaction") }
|
|
||||||
before do
|
|
||||||
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
table_query = "CREATE TABLE IF NOT EXISTS employees (employee_id SERIAL PRIMARY KEY, salary NUMERIC(10, 2) CHECK (salary > 0));"
|
|
||||||
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
|
||||||
q_sock.close
|
|
||||||
|
|
||||||
current_configs = processes.pgcat.current_config
|
|
||||||
current_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = 500
|
|
||||||
processes.pgcat.update_config(current_configs)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
end
|
|
||||||
after do
|
|
||||||
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
table_query = "DROP TABLE IF EXISTS employees;"
|
|
||||||
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
|
||||||
q_sock.close
|
|
||||||
end
|
|
||||||
|
|
||||||
context "When unnamed prepared statements are used" do
|
|
||||||
it "does not cache them" do
|
|
||||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
|
|
||||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
|
||||||
socket.read_from_server
|
|
||||||
|
|
||||||
10.times do |i|
|
|
||||||
socket.send_message(ParseMessage.new("", "SELECT #{i}", []))
|
|
||||||
socket.send_message(BindMessage.new("", "", [], [], [0]))
|
|
||||||
socket.send_message(DescribeMessage.new("S", ""))
|
|
||||||
socket.send_message(ExecuteMessage.new("", 1))
|
|
||||||
socket.send_message(SyncMessage.new)
|
|
||||||
socket.read_from_server
|
|
||||||
end
|
|
||||||
|
|
||||||
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
|
||||||
result = socket.read_from_server
|
|
||||||
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
|
||||||
expect(number_of_saved_statements).to eq(0)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "When named prepared statements are used" do
|
|
||||||
it "caches them" do
|
|
||||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
|
|
||||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
|
||||||
socket.read_from_server
|
|
||||||
|
|
||||||
3.times do
|
|
||||||
socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
|
||||||
socket.send_message(BindMessage.new("", "my_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
|
||||||
socket.send_message(SyncMessage.new)
|
|
||||||
socket.read_from_server
|
|
||||||
end
|
|
||||||
|
|
||||||
3.times do
|
|
||||||
socket.send_message(ParseMessage.new("my_other_query", "SELECT * FROM employees WHERE salary in ($1,$2,$3)", [0,0,0]))
|
|
||||||
socket.send_message(BindMessage.new("", "my_other_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
|
||||||
socket.send_message(SyncMessage.new)
|
|
||||||
socket.read_from_server
|
|
||||||
end
|
|
||||||
|
|
||||||
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
|
||||||
result = socket.read_from_server
|
|
||||||
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
|
||||||
expect(number_of_saved_statements).to eq(2)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "When DISCARD ALL/DEALLOCATE ALL are called" do
|
|
||||||
it "resets server and client caches" do
|
|
||||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
|
|
||||||
20.times do |i|
|
|
||||||
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
|
||||||
end
|
|
||||||
|
|
||||||
20.times do |i|
|
|
||||||
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
|
||||||
end
|
|
||||||
|
|
||||||
socket.send_message(SyncMessage.new)
|
|
||||||
socket.read_from_server
|
|
||||||
|
|
||||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
|
||||||
socket.read_from_server
|
|
||||||
responses = []
|
|
||||||
4.times do |i|
|
|
||||||
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
|
||||||
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
|
||||||
socket.send_message(SyncMessage.new)
|
|
||||||
|
|
||||||
responses += socket.read_from_server
|
|
||||||
end
|
|
||||||
|
|
||||||
errors = responses.select { |message| message[:code] == 'E' }
|
|
||||||
error_message = errors.map { |message| message[:bytes].map(&:chr).join("") }.join("\n")
|
|
||||||
raise StandardError, "Encountered the following errors: #{error_message}" if errors.length > 0
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "Maximum number of bound paramters" do
|
|
||||||
it "does not crash" do
|
|
||||||
test_socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
|
||||||
test_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
|
||||||
|
|
||||||
types = Array.new(65_535) { |i| 0 }
|
|
||||||
|
|
||||||
params = Array.new(65_535) { |i| "$#{i+1}" }.join(",")
|
|
||||||
test_socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in (#{params})", types))
|
|
||||||
|
|
||||||
test_socket.send_message(BindMessage.new("my_query", "my_query", types, types.map(&:to_s), types))
|
|
||||||
|
|
||||||
test_socket.send_message(SyncMessage.new)
|
|
||||||
|
|
||||||
# If the proxy crashes, this will raise an error
|
|
||||||
expect { test_socket.read_from_server }.to_not raise_error
|
|
||||||
|
|
||||||
test_socket.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
664
tests/rust/Cargo.lock
generated
664
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,14 +15,16 @@ async fn test_prepared_statements() {
|
|||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = tokio::task::spawn(async move {
|
||||||
for i in 0..1000 {
|
for _ in 0..1000 {
|
||||||
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
if err.to_string().contains("prepared statement") {
|
||||||
panic!("prepared statement error: {}", err);
|
panic!("prepared statement error: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
handles.push(handle);
|
handles.push(handle);
|
||||||
|
|||||||
Reference in New Issue
Block a user