Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
52119f2333 chore(deps): bump helm/chart-testing-action from 2.2.1 to 2.6.1
Bumps [helm/chart-testing-action](https://github.com/helm/chart-testing-action) from 2.2.1 to 2.6.1.
- [Release notes](https://github.com/helm/chart-testing-action/releases)
- [Commits](https://github.com/helm/chart-testing-action/compare/v2.2.1...v2.6.1)

---
updated-dependencies:
- dependency-name: helm/chart-testing-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-22 17:27:25 +00:00
27 changed files with 387 additions and 2975 deletions

View File

@@ -59,7 +59,6 @@ admin_password = "admin_pass"
# session: one server connection per connected client # session: one server connection per connected client
# transaction: one server connection per client transaction # transaction: one server connection per client transaction
pool_mode = "transaction" pool_mode = "transaction"
prepared_statements_cache_size = 500
# If the client doesn't specify, route traffic to # If the client doesn't specify, route traffic to
# this role by default. # this role by default.
@@ -142,7 +141,6 @@ query_parser_enabled = true
query_parser_read_write_splitting = true query_parser_read_write_splitting = true
primary_reads_enabled = true primary_reads_enabled = true
sharding_function = "pg_bigint_hash" sharding_function = "pg_bigint_hash"
prepared_statements_cache_size = 500
[pools.simple_db.users.0] [pools.simple_db.users.0]
username = "simple_user" username = "simple_user"

View File

@@ -23,17 +23,14 @@ jobs:
steps: steps:
- name: Checkout Repository - name: Checkout Repository
uses: actions/checkout@v4 uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 uses: docker/setup-buildx-action@v2
- name: Determine tags - name: Determine tags
id: metadata id: metadata
uses: docker/metadata-action@v5 uses: docker/metadata-action@v4
with: with:
images: ${{ env.registry }}/${{ env.image-name }} images: ${{ env.registry }}/${{ env.image-name }}
tags: | tags: |
@@ -45,18 +42,15 @@ jobs:
type=raw,value=latest,enable={{ is_default_branch }} type=raw,value=latest,enable={{ is_default_branch }}
- name: Log in to the Container registry - name: Log in to the Container registry
uses: docker/login-action@v3 uses: docker/login-action@v2.1.0
with: with:
registry: ${{ env.registry }} registry: ${{ env.registry }}
username: ${{ github.actor }} username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push ${{ env.image-name }} - name: Build and push ${{ env.image-name }}
uses: docker/build-push-action@v6 uses: docker/build-push-action@v3
with: with:
context: .
platforms: linux/amd64,linux/arm64
provenance: false
push: true push: true
tags: ${{ steps.metadata.outputs.tags }} tags: ${{ steps.metadata.outputs.tags }}
labels: ${{ steps.metadata.outputs.labels }} labels: ${{ steps.metadata.outputs.labels }}

View File

@@ -27,7 +27,7 @@ jobs:
python-version: 3.7 python-version: 3.7
- name: Set up chart-testing - name: Set up chart-testing
uses: helm/chart-testing-action@v2.2.1 uses: helm/chart-testing-action@v2.6.1
with: with:
version: v3.5.1 version: v3.5.1

View File

@@ -6,32 +6,6 @@ Thank you for contributing! Just a few tips here:
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`. 2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
3. Performance is important, make sure there are no regressions in your branch vs. `main`. 3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
Quite simply, make sure you have docker installed and then run
`./start_test_env.sh`
That is it!
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
To rebuild PgCat, just run `cargo build` within the container under `/app`
![Animated gif showing how to run tests](https://github.com/user-attachments/assets/2258fde3-2aed-4efb-bdc5-e4f12dcd4d33)
Happy hacking! Happy hacking!
## TODOs ## TODOs

104
Cargo.lock generated
View File

@@ -146,12 +146,6 @@ dependencies = [
"syn 2.0.26", "syn 2.0.26",
] ]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]] [[package]]
name = "atomic_enum" name = "atomic_enum"
version = "0.2.0" version = "0.2.0"
@@ -548,23 +542,29 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.6" version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
dependencies = [ dependencies = [
"atomic-waker",
"bytes", "bytes",
"fnv", "fnv",
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"futures-util",
"http", "http",
"indexmap", "indexmap 1.9.3",
"slab", "slab",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
] ]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.14.0" version = "0.14.0"
@@ -609,9 +609,9 @@ dependencies = [
[[package]] [[package]]
name = "http" name = "http"
version = "1.1.0" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@@ -620,24 +620,12 @@ dependencies = [
[[package]] [[package]]
name = "http-body" name = "http-body"
version = "1.0.1" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [ dependencies = [
"bytes", "bytes",
"http", "http",
]
[[package]]
name = "http-body-util"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"pin-project-lite", "pin-project-lite",
] ]
@@ -655,12 +643,13 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.4.1" version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-core",
"futures-util", "futures-util",
"h2", "h2",
"http", "http",
@@ -669,26 +658,13 @@ dependencies = [
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project-lite", "pin-project-lite",
"smallvec", "socket2 0.4.9",
"tokio", "tokio",
"tower-service",
"tracing",
"want", "want",
] ]
[[package]]
name = "hyper-util"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.57" version = "0.1.57"
@@ -733,6 +709,16 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.0.0" version = "2.0.0"
@@ -740,7 +726,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown 0.14.0",
] ]
[[package]] [[package]]
@@ -862,7 +848,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.14.0",
] ]
[[package]] [[package]]
@@ -1034,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]] [[package]]
name = "pgcat" name = "pgcat"
version = "1.2.0" version = "1.1.2-dev4"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@@ -1048,9 +1034,7 @@ dependencies = [
"fallible-iterator", "fallible-iterator",
"futures", "futures",
"hmac", "hmac",
"http-body-util",
"hyper", "hyper",
"hyper-util",
"itertools", "itertools",
"jemallocator", "jemallocator",
"log", "log",
@@ -1494,9 +1478,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.13.2" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]] [[package]]
name = "socket2" name = "socket2"
@@ -1526,9 +1510,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlparser" name = "sqlparser"
version = "0.41.0" version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
dependencies = [ dependencies = [
"log", "log",
"sqlparser_derive", "sqlparser_derive",
@@ -1536,13 +1520,13 @@ dependencies = [
[[package]] [[package]]
name = "sqlparser_derive" name = "sqlparser_derive"
version = "0.2.2" version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 1.0.109",
] ]
[[package]] [[package]]
@@ -1757,13 +1741,19 @@ version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [ dependencies = [
"indexmap", "indexmap 2.0.0",
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"winnow", "winnow",
] ]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.37" version = "0.1.37"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "pgcat" name = "pgcat"
version = "1.2.0" version = "1.1.2-dev4"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
sqlparser = { version = "0.41", features = ["visitor"] } sqlparser = {version = "0.34", features = ["visitor"] }
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
parking_lot = "0.12.1" parking_lot = "0.12.1"
@@ -29,9 +29,7 @@ base64 = "0.21"
stringprep = "0.1" stringprep = "0.1"
tokio-rustls = "0.24" tokio-rustls = "0.24"
rustls-pemfile = "1" rustls-pemfile = "1"
http-body-util = "0.1.2" hyper = { version = "0.14", features = ["full"] }
hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1.7", features = ["tokio"] }
phf = { version = "0.11.1", features = ["macros"] } phf = { version = "0.11.1", features = ["macros"] }
exitcode = "1.1.2" exitcode = "1.1.2"
futures = "0.3" futures = "0.3"
@@ -49,12 +47,9 @@ serde_json = "1"
itertools = "0.10" itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] } clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = [ tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
"json",
"env-filter",
"std",
] }
lru = "0.12.0" lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0" jemallocator = "0.5.0"

View File

@@ -1,4 +1,4 @@
FROM rust:1.79.0-slim-bookworm AS builder FROM rust:1-slim-bookworm AS builder
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y build-essential apt-get install -y build-essential
@@ -19,4 +19,3 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat WORKDIR /etc/pgcat
ENV RUST_LOG=info ENV RUST_LOG=info
CMD ["pgcat"] CMD ["pgcat"]
STOPSIGNAL SIGINT

View File

@@ -1,4 +1,4 @@
FROM cimg/rust:1.79.0 FROM cimg/rust:1.67.1
COPY --from=sclevine/yj /bin/yj /bin/yj COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h RUN /bin/yj -h
RUN sudo apt-get update && \ RUN sudo apt-get update && \

View File

@@ -268,8 +268,6 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
Additionally, Prometheus statistics are available at `/metrics` via HTTP. Additionally, Prometheus statistics are available at `/metrics` via HTTP.
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
### Live configuration reloading ### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations. The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.

View File

@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
maintainers: maintainers:
- name: Wildcard - name: Wildcard
email: support@w6d.io email: support@w6d.io
appVersion: "1.2.0" appVersion: "1.1.1"
version: 0.2.0 version: 0.1.0

View File

@@ -240,15 +240,7 @@ configuration:
## the pool_name is what clients use as database name when connecting ## the pool_name is what clients use as database name when connecting
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded" ## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
## @param [object] ## @param [object]
pools: pools: []
[{
name: "simple", pool_mode: "transaction",
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
shards: [{
servers: [{host: "postgres", port: 5432, role: "primary"}],
database: "postgres"
}]
}]
# - ## default values # - ## default values
# ## # ##
# ## # ##

View File

@@ -1,4 +1,4 @@
FROM rust:bullseye FROM rust:1.70-bullseye
# Dependencies # Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj COPY --from=sclevine/yj /bin/yj /bin/yj

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,6 @@ RestartSec=1
Environment=RUST_LOG=info Environment=RUST_LOG=info
LimitNOFILE=65536 LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml ExecStart=/usr/bin/pgcat /etc/pgcat.toml
ExecReload=/bin/kill -SIGHUP $MAINPID
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

View File

@@ -55,12 +55,7 @@ where
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect(); let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
match query_parts match query_parts[0].to_ascii_uppercase().as_str() {
.first()
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
"BAN" => { "BAN" => {
trace!("BAN"); trace!("BAN");
ban(stream, query_parts).await ban(stream, query_parts).await
@@ -89,12 +84,7 @@ where
trace!("SHUTDOWN"); trace!("SHUTDOWN");
shutdown(stream).await shutdown(stream).await
} }
"SHOW" => match query_parts "SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
.get(1)
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
"HELP" => { "HELP" => {
trace!("SHOW HELP"); trace!("SHOW HELP");
show_help(stream).await show_help(stream).await

View File

@@ -38,12 +38,12 @@ pub enum Role {
Mirror, Mirror,
} }
impl std::fmt::Display for Role { impl ToString for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn to_string(&self) -> String {
match self { match *self {
Role::Primary => write!(f, "primary"), Role::Primary => "primary".to_string(),
Role::Replica => write!(f, "replica"), Role::Replica => "replica".to_string(),
Role::Mirror => write!(f, "mirror"), Role::Mirror => "mirror".to_string(),
} }
} }
} }
@@ -476,11 +476,11 @@ pub enum PoolMode {
Session, Session,
} }
impl std::fmt::Display for PoolMode { impl ToString for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn to_string(&self) -> String {
match self { match *self {
PoolMode::Transaction => write!(f, "transaction"), PoolMode::Transaction => "transaction".to_string(),
PoolMode::Session => write!(f, "session"), PoolMode::Session => "session".to_string(),
} }
} }
} }
@@ -493,13 +493,12 @@ pub enum LoadBalancingMode {
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")] #[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections, LeastOutstandingConnections,
} }
impl ToString for LoadBalancingMode {
impl std::fmt::Display for LoadBalancingMode { fn to_string(&self) -> String {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self {
match self { LoadBalancingMode::Random => "random".to_string(),
LoadBalancingMode::Random => write!(f, "random"),
LoadBalancingMode::LeastOutstandingConnections => { LoadBalancingMode::LeastOutstandingConnections => {
write!(f, "least_outstanding_connections") "least_outstanding_connections".to_string()
} }
} }
} }
@@ -1000,17 +999,15 @@ impl Config {
pub fn fill_up_auth_query_config(&mut self) { pub fn fill_up_auth_query_config(&mut self) {
for (_name, pool) in self.pools.iter_mut() { for (_name, pool) in self.pools.iter_mut() {
if pool.auth_query.is_none() { if pool.auth_query.is_none() {
pool.auth_query.clone_from(&self.general.auth_query); pool.auth_query = self.general.auth_query.clone();
} }
if pool.auth_query_user.is_none() { if pool.auth_query_user.is_none() {
pool.auth_query_user pool.auth_query_user = self.general.auth_query_user.clone();
.clone_from(&self.general.auth_query_user);
} }
if pool.auth_query_password.is_none() { if pool.auth_query_password.is_none() {
pool.auth_query_password pool.auth_query_password = self.general.auth_query_password.clone();
.clone_from(&self.general.auth_query_password);
} }
} }
} }
@@ -1158,7 +1155,7 @@ impl Config {
"Default max server lifetime: {}ms", "Default max server lifetime: {}ms",
self.general.server_lifetime self.general.server_lifetime
); );
info!("Server round robin: {}", self.general.server_round_robin); info!("Sever round robin: {}", self.general.server_round_robin);
match self.general.tls_certificate.clone() { match self.general.tls_certificate.clone() {
Some(tls_certificate) => { Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate); info!("TLS certificate: {}", tls_certificate);

View File

@@ -733,10 +733,6 @@ pub fn configure_socket(stream: &TcpStream) {
} }
Err(err) => error!("Could not configure socket: {}", err), Err(err) => error!("Could not configure socket: {}", err),
} }
match sock_ref.set_nodelay(true) {
Ok(_) => (),
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
}
} }
pub trait BytesMutReader { pub trait BytesMutReader {

View File

@@ -813,7 +813,7 @@ impl ConnectionPool {
} }
} }
client_stats.checkout_error(); client_stats.checkout_success();
Err(Error::AllServersDown) Err(Error::AllServersDown)
} }

View File

@@ -1,41 +1,23 @@
use http_body_util::Full; use hyper::service::{make_service_fn, service_fn};
use hyper::body; use hyper::{Body, Method, Request, Response, Server, StatusCode};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use log::{debug, error, info}; use log::{debug, error, info};
use phf::phf_map; use phf::phf_map;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use tokio::net::TcpListener; use std::sync::Arc;
use crate::config::Address; use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier}; use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::get_server_stats;
use crate::stats::pool::PoolStats; use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType { struct MetricHelpType {
help: &'static str, help: &'static str,
ty: &'static str, ty: &'static str,
} }
struct ServerPrometheusStats {
bytes_received: u64,
bytes_sent: u64,
transaction_count: u64,
query_count: u64,
error_count: u64,
active_count: u64,
idle_count: u64,
login_count: u64,
tested_count: u64,
}
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/ // reference for metric types: https://prometheus.io/docs/concepts/metric_types/
// counters only increase // counters only increase
// gauges can arbitrarily increase or decrease // gauges can arbitrarily increase or decrease
@@ -138,46 +120,22 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
}, },
"servers_bytes_received" => MetricHelpType { "servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server", help: "Volume in bytes of network traffic received by server",
ty: "counter", ty: "gauge",
}, },
"servers_bytes_sent" => MetricHelpType { "servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server", help: "Volume in bytes of network traffic sent by server",
ty: "counter", ty: "gauge",
}, },
"servers_transaction_count" => MetricHelpType { "servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server", help: "Number of transactions executed by server",
ty: "counter", ty: "gauge",
}, },
"servers_query_count" => MetricHelpType { "servers_query_count" => MetricHelpType {
help: "Number of queries executed by server", help: "Number of queries executed by server",
ty: "counter", ty: "gauge",
}, },
"servers_error_count" => MetricHelpType { "servers_error_count" => MetricHelpType {
help: "Number of errors", help: "Number of errors",
ty: "counter",
},
"servers_idle_count" => MetricHelpType {
help: "Number of server connection in idle state",
ty: "gauge",
},
"servers_active_count" => MetricHelpType {
help: "Number of server connection in active state",
ty: "gauge",
},
"servers_tested_count" => MetricHelpType {
help: "Number of server connection in tested state",
ty: "gauge",
},
"servers_login_count" => MetricHelpType {
help: "Number of server connection in login state",
ty: "gauge",
},
"servers_is_banned" => MetricHelpType {
help: "0 if server is not banned, 1 if server is banned",
ty: "gauge",
},
"servers_is_paused" => MetricHelpType {
help: "0 if server is not paused, 1 if server is paused",
ty: "gauge", ty: "gauge",
}, },
"databases_pool_size" => MetricHelpType { "databases_pool_size" => MetricHelpType {
@@ -245,9 +203,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string()); labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels) Self::from_name(&format!("databases_{}", name), value, labels)
} }
@@ -262,9 +218,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string()); labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
@@ -274,9 +229,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string()); labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
@@ -290,9 +243,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
} }
} }
async fn prometheus_stats( async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
request: Request<body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
match (request.method(), request.uri().path()) { match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => { (&Method::GET, "/metrics") => {
let mut lines = Vec::new(); let mut lines = Vec::new();
@@ -378,51 +329,34 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) { fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats(); let server_stats = get_server_stats();
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new(); let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
for (_, stats) in server_stats { for (_, stats) in server_stats {
let entry = prom_stats server_stats_by_addresses.insert(stats.address_name(), stats);
.entry(stats.address_name())
.or_insert(ServerPrometheusStats {
bytes_received: 0,
bytes_sent: 0,
transaction_count: 0,
query_count: 0,
error_count: 0,
active_count: 0,
idle_count: 0,
login_count: 0,
tested_count: 0,
});
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
entry.query_count += stats.query_count.load(Ordering::Relaxed);
entry.error_count += stats.error_count.load(Ordering::Relaxed);
match stats.state.load(Ordering::Relaxed) {
crate::stats::ServerState::Login => entry.login_count += 1,
crate::stats::ServerState::Active => entry.active_count += 1,
crate::stats::ServerState::Tested => entry.tested_count += 1,
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
} }
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
if let Some(server_info) = prom_stats.get(&address.name()) { if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
let metrics = [ let metrics = [
("bytes_received", server_info.bytes_received), (
("bytes_sent", server_info.bytes_sent), "bytes_received",
("transaction_count", server_info.transaction_count), server_info.bytes_received.load(Ordering::Relaxed),
("query_count", server_info.query_count), ),
("error_count", server_info.error_count), ("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
("idle_count", server_info.idle_count), (
("active_count", server_info.active_count), "transaction_count",
("login_count", server_info.login_count), server_info.transaction_count.load(Ordering::Relaxed),
("tested_count", server_info.tested_count), ),
("is_banned", if pool.is_banned(address) { 1 } else { 0 }), (
("is_paused", if pool.paused() { 1 } else { 0 }), "query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
]; ];
for (key, value) in metrics { for (key, value) in metrics {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
@@ -440,35 +374,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
} }
pub async fn start_metric_server(http_addr: SocketAddr) { pub async fn start_metric_server(http_addr: SocketAddr) {
let listener = TcpListener::bind(http_addr); let http_service_factory =
let listener = match listener.await { make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
Ok(listener) => listener, let server = Server::bind(&http_addr).serve(http_service_factory);
Err(e) => {
error!("Failed to bind prometheus server to HTTP address: {}.", e);
return;
}
};
info!( info!(
"Exposing prometheus metrics on http://{}/metrics.", "Exposing prometheus metrics on http://{}/metrics.",
http_addr http_addr
); );
loop { if let Err(e) = server.await {
let stream = match listener.accept().await { error!("Failed to run HTTP server: {}.", e);
Ok((stream, _)) => stream,
Err(e) => {
error!("Error accepting connection: {}", e);
continue;
}
};
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(prometheus_stats))
.await
{
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
}
});
} }
} }

View File

@@ -427,12 +427,8 @@ impl QueryRouter {
None => (), None => (),
}; };
let has_locks = !query.locks.is_empty();
if has_locks {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary. // If we already visited a write statement, we should be going to the primary.
if !visited_write_statement {
self.active_role = match self.primary_reads_enabled() { self.active_role = match self.primary_reads_enabled() {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica. false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case. true => None, // Any server role is fine in this case.
@@ -503,7 +499,6 @@ impl QueryRouter {
table: _, table: _,
on: _, on: _,
returning: _, returning: _,
ignore: _,
} => { } => {
// Not supported in postgres. // Not supported in postgres.
assert!(or.is_none()); assert!(or.is_none());
@@ -511,18 +506,14 @@ impl QueryRouter {
assert!(after_columns.is_empty()); assert!(after_columns.is_empty());
Self::process_table(table_name, &mut table_names); Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns)); Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
} }
}
Delete { Delete {
tables, tables,
from, from,
using, using,
selection, selection,
returning: _, returning: _,
order_by: _,
limit: _,
} => { } => {
if let Some(expr) = selection { if let Some(expr) = selection {
exprs.push(expr.clone()); exprs.push(expr.clone());
@@ -1162,29 +1153,6 @@ mod test {
} }
} }
#[test]
fn test_select_for_update() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let queries_in_primary_role = vec![
simple_query("BEGIN"), // Transaction start
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
];
for query in queries_in_primary_role {
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
// query without lock do not change role
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), None);
}
#[test] #[test]
fn test_infer_primary_reads_enabled() { fn test_infer_primary_reads_enabled() {
QueryRouter::setup(); QueryRouter::setup();
@@ -1399,19 +1367,6 @@ mod test {
assert!(!qr.query_parser_enabled()); assert!(!qr.query_parser_enabled());
} }
#[test]
fn test_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
}
#[test] #[test]
fn test_update_from_pool_settings() { fn test_update_from_pool_settings() {
QueryRouter::setup(); QueryRouter::setup();

View File

@@ -14,11 +14,11 @@ pub enum ShardingFunction {
Sha1, Sha1,
} }
impl std::fmt::Display for ShardingFunction { impl ToString for ShardingFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn to_string(&self) -> String {
match self { match *self {
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"), ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
ShardingFunction::Sha1 => write!(f, "sha1"), ShardingFunction::Sha1 => "sha1".to_string(),
} }
} }
} }

View File

@@ -1,34 +0,0 @@
GREEN="\033[0;32m"
RED="\033[0;31m"
BLUE="\033[0;34m"
RESET="\033[0m"
cd tests/docker/
docker compose kill main || true
docker compose build main
docker compose down
docker compose up -d
# wait for the container to start
while ! docker compose exec main ls; do
echo "Waiting for test environment to start"
sleep 1
done
echo "==================================="
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
docker compose exec --workdir /app main cargo build
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
docker compose exec --workdir /app/tests/ruby main bundle install
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again"
echo "==================================="
docker compose exec --workdir /app/tests main bash

View File

@@ -1,3 +1,4 @@
version: "3"
services: services:
pg1: pg1:
image: postgres:14 image: postgres:14
@@ -47,8 +48,6 @@ services:
main: main:
build: . build: .
command: ["bash", "/app/tests/docker/run.sh"] command: ["bash", "/app/tests/docker/run.sh"]
environment:
- INTERACTIVE_TEST_ENVIRONMENT=true
volumes: volumes:
- ../../:/app/ - ../../:/app/
- /app/target/ - /app/target/

View File

@@ -5,38 +5,6 @@ rm /app/*.profraw || true
rm /app/pgcat.profdata || true rm /app/pgcat.profdata || true
rm -rf /app/cov || true rm -rf /app/cov || true
# Prepares the interactive test environment
#
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
ports=(5432 7432 8432 9432 10432)
for port in "${ports[@]}"; do
is_it_up=0
attempts=0
while [ $is_it_up -eq 0 ]; do
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "PostgreSQL on port $port is up."
is_it_up=1
else
attempts=$((attempts+1))
if [ $attempts -gt 10 ]; then
echo "PostgreSQL on port $port is down, giving up."
exit 1
fi
echo "PostgreSQL on port $port is down, waiting for it to start."
sleep 1
fi
done
done
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
sleep 100000000000000000
exit 0
fi
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw" export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
export RUSTC_BOOTSTRAP=1 export RUSTC_BOOTSTRAP=1
export CARGO_INCREMENTAL=0 export CARGO_INCREMENTAL=0

View File

@@ -91,27 +91,6 @@ describe "Admin" do
end end
end end
[
"SHOW ME THE MONEY",
"SHOW ME THE WAY",
"SHOW UP",
"SHOWTIME",
"HAMMER TIME",
"SHOWN TO BE TRUE",
"SHOW ",
"SHOW ",
"SHOW 1",
";;;;;"
].each do |cmd|
describe "Bad command #{cmd}" do
it "does not panic and responds with PG::SystemError" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
admin_conn.close
end
end
end
describe "PAUSE" do describe "PAUSE" do
it "pauses all pools" do it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string) admin_conn = PG::connect(processes.pgcat.admin_connection_string)

664
tests/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,14 +15,16 @@ async fn test_prepared_statements() {
for _ in 0..5 { for _ in 0..5 {
let pool = pool.clone(); let pool = pool.clone();
let handle = tokio::task::spawn(async move { let handle = tokio::task::spawn(async move {
for i in 0..1000 { for _ in 0..1000 {
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await { match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err); panic!("prepared statement error: {}", err);
} }
} }
} }
}
}); });
handles.push(handle); handles.push(handle);