Compare commits

..

12 Commits

Author SHA1 Message Date
Lev Kokotov
3ca28a62c4 Dont accept empty passwords 2023-03-30 18:09:01 -07:00
Lev Kokotov
b65c1ddd56 readme 2023-03-30 17:36:49 -07:00
Lev Kokotov
ed31053cdb Fix spec 2023-03-30 17:35:32 -07:00
Lev Kokotov
4969abf355 Hmm 2023-03-30 15:29:10 -07:00
Lev Kokotov
112c0bdae8 Rebased 2023-03-30 15:19:52 -07:00
Lev Kokotov
fef737ea43 fmt 2023-03-30 14:16:50 -07:00
Lev Kokotov
345ee88342 Warn when secrets are too short 2023-03-30 14:16:38 -07:00
Lev Kokotov
db3d6c3baa Some tests 2023-03-30 14:16:36 -07:00
Lev Kokotov
197c32b4e8 Readme 2023-03-30 14:15:30 -07:00
Lev Kokotov
6345c39bd5 fix ci config 2023-03-30 14:15:07 -07:00
Lev Kokotov
32b913af94 update admin 2023-03-30 14:15:07 -07:00
Lev Kokotov
5c673b4333 Zero-downtime password rotation 2023-03-30 14:15:05 -07:00
88 changed files with 3170 additions and 11171 deletions

View File

@@ -9,7 +9,7 @@ jobs:
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
docker:
- image: ghcr.io/postgresml/pgcat-ci:latest
- image: ghcr.io/levkk/pgcat-ci:1.67
environment:
RUST_LOG: info
LLVM_PROFILE_FILE: /tmp/pgcat-%m-%p.profraw
@@ -63,9 +63,6 @@ jobs:
- run:
name: "Lint"
command: "cargo fmt --check"
- run:
name: "Clippy"
command: "cargo clippy --all --all-targets -- -Dwarnings"
- run:
name: "Tests"
command: "cargo clean && cargo build && cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh"

View File

@@ -39,7 +39,7 @@ log_client_connections = false
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = 15000
autoreload = true
# TLS
tls_certificate = ".circleci/server.cert"
@@ -74,10 +74,6 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
@@ -138,7 +134,6 @@ database = "shard2"
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

View File

@@ -108,24 +108,8 @@ cd ../..
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1
#
# Go tests
# Starts its own pgcat server
#
pushd tests/go
/usr/local/go/bin/go test || exit 1
popd
start_pgcat "info"
#
# Rust tests
#
cd tests/rust
cargo run
cd ../../
# Admin tests
export PGPASSWORD=admin_pass
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null

View File

@@ -1,14 +0,0 @@
root = true
[*]
trim_trailing_whitespace = true
insert_final_newline = true
[*.rs]
indent_style = space
indent_size = 4
max_line_length = 120
[*.toml]
indent_style = space
indent_size = 2

View File

@@ -1,11 +1,6 @@
name: Build and Push
on:
push:
branches:
- main
tags:
- v*
on: push
env:
registry: ghcr.io
@@ -34,7 +29,6 @@ jobs:
tags: |
type=sha,prefix=,format=long
type=schedule
type=ref,event=tag
type=ref,event=branch
type=ref,event=pr
type=raw,value=latest,enable={{ is_default_branch }}

View File

@@ -1,48 +0,0 @@
name: pgcat package (deb)
on:
workflow_dispatch:
inputs:
packageVersion:
default: "1.1.2-dev1"
jobs:
build:
strategy:
max-parallel: 1
fail-fast: false # Let the other job finish, or they can lock each other out
matrix:
os: ["buildjet-4vcpu-ubuntu-2204", "buildjet-4vcpu-ubuntu-2204-arm"]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
TZ: Etc/UTC
run: |
curl -sLO https://github.com/deb-s3/deb-s3/releases/download/0.11.4/deb-s3-0.11.4.gem
sudo gem install deb-s3-0.11.4.gem
dpkg-deb --version
- name: Build and release package
env:
AWS_ACCESS_KEY_ID: ${{ vars.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: ${{ vars.AWS_DEFAULT_REGION }}
run: |
if [[ $(arch) == "x86_64" ]]; then
export ARCH=amd64
else
export ARCH=arm64
fi
bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \
--lock \
--bucket apt.postgresml.org \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs)

1
.gitignore vendored
View File

@@ -10,4 +10,3 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv

183
CONFIG.md
View File

@@ -1,4 +1,4 @@
# PgCat Configurations
# PgCat Configurations
## `general` Section
### host
@@ -49,46 +49,6 @@ default: 30000 # milliseconds
How long an idle connection with a server is left open (ms).
### server_lifetime
```
path: general.server_lifetime
default: 86400000 # 24 hours
```
Max connection lifetime before it's closed, even if actively used.
### server_round_robin
```
path: general.server_round_robin
default: false
```
Whether to use round robin for server selection or not.
### server_tls
```
path: general.server_tls
default: false
```
Whether to use TLS for server connections or not.
### verify_server_certificate
```
path: general.verify_server_certificate
default: false
```
Whether to verify server certificate or not.
### verify_config
```
path: general.verify_config
default: true
```
Whether to verify config or not.
### idle_client_in_transaction_timeout
```
path: general.idle_client_in_transaction_timeout
@@ -148,10 +108,10 @@ If we should log client disconnections
### autoreload
```
path: general.autoreload
default: 15000 # milliseconds
default: false
```
When set, PgCat automatically reloads its configurations at the specified interval (in milliseconds) if it detects changes in the configuration file. The default interval is 15000 milliseconds or 15 seconds.
When set to true, PgCat reloads configs if it detects a change in the config file.
### worker_threads
```
@@ -183,13 +143,7 @@ path: general.tcp_keepalives_interval
default: 5
```
### tcp_user_timeout
```
path: general.tcp_user_timeout
default: 10000
```
A linux-only parameters that defines the amount of time in milliseconds that transmitted data may remain unacknowledged or buffered data may remain untransmitted (due to zero window size) before TCP will forcibly disconnect
Number of seconds between keepalive packets.
### tls_certificate
```
@@ -198,7 +152,7 @@ default: <UNSET>
example: "server.cert"
```
Path to TLS Certificate file to use for TLS connections
Path to TLS Certficate file to use for TLS connections
### tls_private_key
```
@@ -221,60 +175,41 @@ Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DAT
### admin_password
```
path: general.admin_password
default: "admin_pass"
default: <UNSET>
```
Password to access the virtual administrative database
### auth_query
### auth_query (experimental)
```
path: general.auth_query
default: <UNSET>
example: "SELECT $1"
```
Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
established using the database configured in the pool. This parameter is inherited by every pool
and can be redefined in pool configuration.
### auth_query_user
### auth_query_user (experimental)
```
path: general.auth_query_user
default: <UNSET>
example: "sharding_user"
```
User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### auth_query_password
### auth_query_password (experimental)
```
path: general.auth_query_password
default: <UNSET>
example: "sharding_user"
```
Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### dns_cache_enabled
```
path: general.dns_cache_enabled
default: false
```
When enabled, ip resolutions for server connections specified using hostnames will be cached
and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
old ip connections are closed (gracefully) and new connections will start using new ip.
### dns_max_ttl
```
path: general.dns_max_ttl
default: 30
```
Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
## `pools.<pool_name>` Section
### pool_mode
@@ -295,7 +230,7 @@ default: "random"
Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy connections
`loc` selects the server with the least outstanding busy conncetions
### default_role
```
@@ -308,16 +243,7 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.
### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
default: 0
```
Size of the prepared statements cache. 0 means disabled.
TODO: update documentation
### query_parser_enabled
### query_parser_enabled (experimental)
```
path: pools.<pool_name>.query_parser_enabled
default: true
@@ -338,7 +264,7 @@ If the query parser is enabled and this setting is enabled, the primary will be
load balancing of read queries. Otherwise, the primary will only be used for write
queries. The primary can always be explicitly selected with our custom protocol.
### sharding_key_regex
### sharding_key_regex (experimental)
```
path: pools.<pool_name>.sharding_key_regex
default: <UNSET>
@@ -360,40 +286,7 @@ Current options:
`pg_bigint_hash`: PARTITION BY HASH (Postgres hashing function)
`sha1`: A hashing function based on SHA1
### auth_query
```
path: pools.<pool_name>.auth_query
default: <UNSET>
example: "SELECT $1"
```
Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
established using the database configured in the pool. This parameter is inherited by every pool
and can be redefined in pool configuration.
### auth_query_user
```
path: pools.<pool_name>.auth_query_user
default: <UNSET>
example: "sharding_user"
```
User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### auth_query_password
```
path: pools.<pool_name>.auth_query_password
default: <UNSET>
example: "sharding_user"
```
Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
This parameter is inherited by every pool and can be redefined in pool configuration.
### automatic_sharding_key
### automatic_sharding_key (experimental)
```
path: pools.<pool_name>.automatic_sharding_key
default: <UNSET>
@@ -418,6 +311,30 @@ default: 3000
Connect timeout can be overwritten in the pool
### auth_query (experimental)
```
path: general.auth_query
default: <UNSET>
```
Auth query can be overwritten in the pool
### auth_query_user (experimental)
```
path: general.auth_query_user
default: <UNSET>
```
Auth query user can be overwritten in the pool
### auth_query_password (experimental)
```
path: general.auth_query_password
default: <UNSET>
```
Auth query password can be overwritten in the pool
## `pools.<pool_name>.users.<user_index>` Section
### username
@@ -426,8 +343,7 @@ path: pools.<pool_name>.users.<user_index>.username
default: "sharding_user"
```
PostgreSQL username used to authenticate the user and connect to the server
if `server_username` is not set.
Postgresql username
### password
```
@@ -435,26 +351,7 @@ path: pools.<pool_name>.users.<user_index>.password
default: "sharding_user"
```
PostgreSQL password used to authenticate the user and connect to the server
if `server_password` is not set.
### server_username
```
path: pools.<pool_name>.users.<user_index>.server_username
default: <UNSET>
example: "another_user"
```
PostgreSQL username used to connect to the server.
### server_password
```
path: pools.<pool_name>.users.<user_index>.server_password
default: <UNSET>
example: "another_password"
```
PostgreSQL password used to connect to the server.
Postgresql password
### pool_size
```
@@ -485,7 +382,7 @@ default: [["127.0.0.1", 5432, "primary"], ["localhost", 5432, "replica"]]
Array of servers in the shard, each server entry is an array of `[host, port, role]`
### mirrors
### mirrors (experimental)
```
path: pools.<pool_name>.shards.<shard_index>.mirrors
default: <UNSET>

View File

@@ -2,7 +2,7 @@
Thank you for contributing! Just a few tips here:
1. `cargo fmt` and `cargo clippy` your code before opening up a PR
1. `cargo fmt` your code before opening up a PR
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
3. Performance is important, make sure there are no regressions in your branch vs. `main`.

1344
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -8,26 +8,27 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "0.8.1"
bb8 = "0.8.0"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
sha-1 = "0.10"
toml = "0.7"
serde = { version = "1", features = ["derive"] }
serde = "1"
serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = {version = "0.34", features = ["visitor"] }
sqlparser = "0.32.0"
log = "0.4"
arc-swap = "1"
env_logger = "0.10"
parking_lot = "0.12.1"
hmac = "0.12"
sha2 = "0.10"
base64 = "0.21"
stringprep = "0.1"
tokio-rustls = "0.24"
tokio-rustls = "0.23"
rustls-pemfile = "1"
hyper = { version = "0.14", features = ["full"] }
phf = { version = "0.11.1", features = ["macros"] }
@@ -36,20 +37,8 @@ futures = "0.3"
socket2 = { version = "0.4.7", features = ["all"] }
nix = "0.26.2"
atomic_enum = "0.2.0"
postgres-protocol = "0.6.5"
postgres-protocol = "0.6.4"
fallible-iterator = "0.2"
pin-project = "1"
webpki-roots = "0.23"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
itertools = "0.10"
clap = { version = "4.4.18", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,19 +1,9 @@
FROM rust:1-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential
FROM rust:1 AS builder
COPY . /app
WORKDIR /app
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -o Dpkg::Options::=--force-confdef -yq --no-install-recommends \
postgresql-client \
# Clean up layer
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& truncate -s 0 /var/log/*log
FROM debian:bullseye-slim
COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat

View File

@@ -1,6 +1,4 @@
FROM cimg/rust:1.67.1
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \
sudo apt-get install -y \
psmisc postgresql-contrib-14 postgresql-client-14 libpq-dev \
@@ -9,9 +7,6 @@ RUN sudo apt-get update && \
sudo apt-get upgrade curl && \
cargo install cargo-binutils rustfilt && \
rustup component add llvm-tools-preview && \
pip3 install psycopg2 && sudo gem install bundler && \
pip3 install psycopg2 && sudo gem install bundler && \
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -1,25 +0,0 @@
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
RUN apt-get update && \
apt-get install -y build-essential
WORKDIR /app
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Build dependencies - this is the caching Docker layer!
RUN cargo chef cook --release --recipe-path recipe.json
# Build application
COPY . .
RUN cargo build
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/pgcat /usr/bin/pgcat
COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
CMD ["pgcat"]

View File

@@ -18,56 +18,25 @@ PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load bal
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
| SSL/TLS | **Stable** | Clients can connect to the pooler using TLS. Pooler can connect to Postgres servers using TLS. |
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
| Sharding using extended SQL syntax | **Experimental** | Clients can dynamically configure the pooler to route queries to specific shards. |
| Sharding using comments parsing/Regex | **Experimental** | Clients can include shard information (sharding key, shard ID) in the query comments. |
| Automatic sharding | **Experimental** | PgCat can parse queries, detect sharding keys automatically, and route queries to the correct shard. |
| Mirroring | **Experimental** | Mirror queries between multiple databases in order to test servers with realistic production traffic. |
| Auth passthrough | **Experimental** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file. |
| Password rotation | **Experimental** | Allows to rotate passwords without downtime or using third-party tools to manage Postgres authentication. |
## Status
PgCat is stable and used in production to serve hundreds of thousands of queries per second.
PgCat is stable and used in production to serve hundreds of thousands of queries per second. Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
<table>
<tr>
<td>
<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f">
<img src="./images/instacart.webp" height="70" width="auto">
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<img src="./images/postgresml.webp" height="70" width="auto">
</a>
</td>
<td>
<a href="https://onesignal.com">
<img src="./images/one_signal.webp" height="70" width="auto">
</a>
</td>
</tr>
<tr>
<td>
<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f">
Instacart
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
PostgresML
</a>
</td>
<td>
OneSignal
</td>
</tr>
</table>
Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
| | |
|-|-|
|<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f"><img src="./images/instacart.webp" height="70" width="auto"></a>|<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second"><img src="./images/postgresml.webp" height="70" width="auto"></a>|
| [Instacart](https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f) | [PostgresML](https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second) |
## Deployment
@@ -131,7 +100,7 @@ You can open a Docker development environment where you can debug tests easier.
./dev/script/console
```
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the container (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
## Usage
@@ -276,6 +245,12 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu
Mirroring allows to route queries to multiple databases at the same time. This is useful for prewarning replicas before placing them into the active configuration, or for testing different versions of Postgres with live traffic.
### Password rotation
Password rotation allows to specify multiple passwords for a user, so they can connect to PgCat with multiple credentials. This allows distributed applications to change their configuration (connection strings) gradually and for PgCat to monitor their progression in admin statistics. Once the new secret is deployed everywhere, the old one can be removed from PgCat.
This also decouples server passwords from client passwords, allowing to change one without necessarily changing the other.
## License
PgCat is free and open source, released under the MIT license.

View File

@@ -1,9 +0,0 @@
Package: pgcat
Version: ${PACKAGE_VERSION}
Section: database
Priority: optional
Architecture: ${ARCH}
Maintainer: PostgresML <team@postgresml.org>
Homepage: https://postgresml.org
Description: PgCat - NextGen PostgreSQL Pooler
PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.

View File

@@ -1,8 +1,6 @@
FROM rust:1.70-bullseye
FROM rust:bullseye
# Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN apt-get update -y \
&& apt-get install -y \
llvm-11 psmisc postgresql-contrib postgresql-client \

View File

@@ -25,7 +25,7 @@ x-common-env-pg:
services:
main:
image: gcr.io/google_containers/pause:3.2
image: kubernetes/pause
ports:
- 6432

View File

@@ -38,6 +38,9 @@ log_client_connections = false
# If we should log client disconnections
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = false
# TLS
# tls_certificate = "server.cert"
# tls_private_key = "server.key"
@@ -71,13 +74,9 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -1,22 +0,0 @@
# This is an example of the most basic config
# that will mimic what PgBouncer does in transaction mode with one server.
[general]
host = "0.0.0.0"
port = 6433
admin_username = "pgcat"
admin_password = "pgcat"
[pools.pgml.users.0]
username = "postgres"
password = "postgres"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.pgml.shards.0]
servers = [
["127.0.0.1", 28815, "primary"]
]
database = "postgres"

View File

@@ -1,16 +0,0 @@
[Unit]
Description=PgCat pooler
After=network.target
StartLimitIntervalSec=0
[Service]
User=pgcat
Type=simple
Restart=always
RestartSec=1
Environment=RUST_LOG=info
LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
[Install]
WantedBy=multi-user.target

View File

@@ -23,9 +23,6 @@ connect_timeout = 5000 # milliseconds
# How long an idle connection with a server is left open (ms).
idle_timeout = 30000 # milliseconds
# Max connection lifetime before it's closed, even if actively used.
server_lifetime = 86400000 # 24 hours
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout = 0 # milliseconds
@@ -48,7 +45,7 @@ log_client_connections = false
log_client_disconnections = false
# When set to true, PgCat reloads configs if it detects a change in the config file.
autoreload = 15000
autoreload = false
# Number of worker threads the Runtime will use (4 by default).
worker_threads = 5
@@ -60,75 +57,17 @@ tcp_keepalives_count = 5
# Number of seconds between keepalive packets.
tcp_keepalives_interval = 5
# Path to TLS Certificate file to use for TLS connections
# Path to TLS Certficate file to use for TLS connections
# tls_certificate = ".circleci/server.cert"
# Path to TLS private key file to use for TLS connections
# tls_private_key = ".circleci/server.key"
# Enable/disable server TLS
server_tls = false
# Verify server certificate is completely authentic.
verify_server_certificate = false
# User name to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username = "admin_user"
# Password to access the virtual administrative database
admin_password = "admin_pass"
# Default plugins that are configured on all pools.
[plugins]
# Prewarmer plugin that runs queries on server startup, before giving the connection
# to the client.
[plugins.prewarmer]
enabled = false
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]
# Log all queries to stdout.
[plugins.query_logger]
enabled = false
# Block access to tables that Postgres does not allow us to control.
[plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]
# Intercept user queries and give a fake reply.
[plugins.intercept]
enabled = true
[plugins.intercept.queries.0]
query = "select current_database() as a, current_schemas(false) as b"
schema = [
["a", "text"],
["b", "text"],
]
result = [
["${DATABASE}", "{public}"],
]
[plugins.intercept.queries.1]
query = "select current_database(), current_schema(), current_user"
schema = [
["current_database", "text"],
["current_schema", "text"],
["current_user", "text"],
]
result = [
["${DATABASE}", "public", "${USER}"],
]
# pool configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting.
# For a pool named `sharded_db`, clients access that pool using connection string like
@@ -150,20 +89,12 @@ load_balancing_mode = "random"
# `primary` all queries go to the primary unless otherwise specified.
default_role = "any"
# Prepared statements cache size.
# TODO: update documentation
prepared_statements_cache_size = 500
# If Query Parser is enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
@@ -175,12 +106,6 @@ primary_reads_enabled = true
# shard_id_regex = '/\* shard_id: (\d+) \*/'
# regex_search_limit = 1000 # only look at the first 1000 characters of SQL statements
# Defines the behavior when no shard is selected in a sharded system.
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
# Current options:
@@ -188,21 +113,6 @@ primary_reads_enabled = true
# `sha1`: A hashing function based on SHA1
sharding_function = "pg_bigint_hash"
# Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
# established using the database configured in the pool. This parameter is inherited by every pool
# and can be redefined in pool configuration.
# auth_query="SELECT usename, passwd FROM pg_shadow WHERE usename='$1'"
# User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
# specified in `auth_query_user`. The connection will be established using the database configured in the pool.
# This parameter is inherited by every pool and can be redefined in pool configuration.
# auth_query_user = "sharding_user"
# Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
# specified in `auth_query_user`. The connection will be established using the database configured in the pool.
# This parameter is inherited by every pool and can be redefined in pool configuration.
# auth_query_password = "sharding_user"
# Automatically parse this from queries and route queries to the right shard!
# automatic_sharding_key = "data.id"
@@ -212,86 +122,26 @@ idle_timeout = 40000
# Connect timeout can be overwritten in the pool
connect_timeout = 3000
# When enabled, ip resolutions for server connections specified using hostnames will be cached
# and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
# old ip connections are closed (gracefully) and new connections will start using new ip.
# dns_cache_enabled = false
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
# dns_max_ttl = 30
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
# so all plugins have to be configured here again.
[pool.sharded_db.plugins]
[pools.sharded_db.plugins.prewarmer]
enabled = true
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]
[pools.sharded_db.plugins.query_logger]
enabled = false
[pools.sharded_db.plugins.table_access]
enabled = false
tables = [
"pg_user",
"pg_roles",
"pg_database",
]
[pools.sharded_db.plugins.intercept]
enabled = true
[pools.sharded_db.plugins.intercept.queries.0]
query = "select current_database() as a, current_schemas(false) as b"
schema = [
["a", "text"],
["b", "text"],
]
result = [
["${DATABASE}", "{public}"],
]
[pools.sharded_db.plugins.intercept.queries.1]
query = "select current_database(), current_schema(), current_user"
schema = [
["current_database", "text"],
["current_schema", "text"],
["current_user", "text"],
]
result = [
["${DATABASE}", "public", "${USER}"],
]
# auth_query = "SELECT * FROM public.user_lookup('$1')"
# auth_query_user = "postgres"
# auth_query_password = "postgres"
# User configs are structured as pool.<pool_name>.users.<user_index>
# This section holds the credentials for users that may connect to this cluster
# This secion holds the credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
# PostgreSQL username used to authenticate the user and connect to the server
# if `server_username` is not set.
# Postgresql username
username = "sharding_user"
# PostgreSQL password used to authenticate the user and connect to the server
# if `server_password` is not set.
# Postgresql password
password = "sharding_user"
pool_mode = "transaction"
# PostgreSQL username used to connect to the server.
# server_username = "another_user"
# PostgreSQL password used to connect to the server.
# server_password = "another_password"
# # Passwords the client can use to connect. Useful for password rotations.
# secrets = [ "secret_one", "secret_two" ]
# Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 9
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# 0 means it is disabled.
statement_timeout = 0
@@ -301,8 +151,6 @@ username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000
connect_timeout = 1000
idle_timeout = 1000
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
# Each shard config contains a list of servers that make up the shard
@@ -338,8 +186,6 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
min_pool_size = 3
server_lifetime = 60000
statement_timeout = 0
[pools.simple_db.shards.0]

View File

@@ -1,9 +0,0 @@
#!/bin/bash
set -e
systemctl daemon-reload
systemctl enable pgcat
if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat
fi

4
postrm
View File

@@ -1,4 +0,0 @@
#!/bin/bash
set -e
systemctl daemon-reload

5
prerm
View File

@@ -1,5 +0,0 @@
#!/bin/bash
set -e
systemctl stop pgcat
systemctl disable pgcat

View File

@@ -1,6 +1,4 @@
use crate::pool::BanReason;
use crate::server::ServerParameters;
use crate::stats::pool::PoolStats;
use bytes::{Buf, BufMut, BytesMut};
use log::{error, info, trace};
use nix::sys::signal::{self, Signal};
@@ -14,20 +12,20 @@ use tokio::time::Instant;
use crate::config::{get_config, reload_config, VERSION};
use crate::errors::Error;
use crate::messages::*;
use crate::pool::ClientServerMap;
use crate::pool::{get_all_pools, get_pool};
use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState};
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
use crate::ClientServerMap;
pub fn generate_server_parameters_for_admin() -> ServerParameters {
let mut server_parameters = ServerParameters::new();
pub fn generate_server_info_for_admin() -> BytesMut {
let mut server_info = BytesMut::new();
server_parameters.set_param("application_name".to_string(), "".to_string(), true);
server_parameters.set_param("client_encoding".to_string(), "UTF8".to_string(), true);
server_parameters.set_param("server_encoding".to_string(), "UTF8".to_string(), true);
server_parameters.set_param("server_version".to_string(), VERSION.to_string(), true);
server_parameters.set_param("DateStyle".to_string(), "ISO, MDY".to_string(), true);
server_info.put(server_parameter_message("application_name", ""));
server_info.put(server_parameter_message("client_encoding", "UTF8"));
server_info.put(server_parameter_message("server_encoding", "UTF8"));
server_info.put(server_parameter_message("server_version", VERSION));
server_info.put(server_parameter_message("DateStyle", "ISO, MDY"));
server_parameters
server_info
}
/// Handle admin client.
@@ -74,21 +72,17 @@ where
}
"PAUSE" => {
trace!("PAUSE");
pause(stream, query_parts).await
pause(stream, query_parts[1]).await
}
"RESUME" => {
trace!("RESUME");
resume(stream, query_parts).await
resume(stream, query_parts[1]).await
}
"SHUTDOWN" => {
trace!("SHUTDOWN");
shutdown(stream).await
}
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
"HELP" => {
trace!("SHOW HELP");
show_help(stream).await
}
"BANS" => {
trace!("SHOW BANS");
show_bans(stream).await
@@ -260,50 +254,41 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let pool_lookup = PoolStats::construct_pool_lookup();
let mut res = BytesMut::new();
res.put(row_description(&PoolStats::generate_header()));
pool_lookup.iter().for_each(|(_identifier, pool_stats)| {
res.put(data_row(&pool_stats.generate_row()));
});
res.put(command_complete("SHOW"));
let all_pool_stats = get_pool_stats();
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
/// Show all available options.
async fn show_help<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut res = BytesMut::new();
let detail_msg = [
"",
"SHOW HELP|CONFIG|DATABASES|POOLS|CLIENTS|SERVERS|USERS|VERSION",
// "SHOW PEERS|PEER_POOLS", // missing PEERS|PEER_POOLS
// "SHOW FDS|SOCKETS|ACTIVE_SOCKETS|LISTS|MEM|STATE", // missing FDS|SOCKETS|ACTIVE_SOCKETS|MEM|STATE
"SHOW LISTS",
// "SHOW DNS_HOSTS|DNS_ZONES", // missing DNS_HOSTS|DNS_ZONES
"SHOW STATS", // missing STATS_TOTALS|STATS_AVERAGES|TOTALS
"SET key = arg",
"RELOAD",
"PAUSE [<db>, <user>]",
"RESUME [<db>, <user>]",
// "DISABLE <db>", // missing
// "ENABLE <db>", // missing
// "RECONNECT [<db>]", missing
// "KILL <db>",
// "SUSPEND",
"SHUTDOWN",
let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("secret", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
res.put(notify("Console usage", detail_msg.join("\n\t")));
let mut res = BytesMut::new();
res.put(row_description(&columns));
for (_, pool_stats) in all_pool_stats {
let mut row = vec![
pool_stats.database(),
pool_stats.user(),
pool_stats.redacted_secret(),
pool_stats.pool_mode().to_string(),
];
pool_stats.populate_row(&mut row);
pool_stats.clear_maxwait();
res.put(data_row(&row));
}
res.put(command_complete("SHOW"));
// ReadyForQuery
@@ -351,17 +336,17 @@ where
let paused = pool.paused();
res.put(data_row(&vec![
address.name(), // name
address.host.to_string(), // host
address.port.to_string(), // port
database_name.to_string(), // database
pool_config.user.username.to_string(), // force_user
pool_config.user.pool_size.to_string(), // pool_size
pool_config.user.min_pool_size.unwrap_or(0).to_string(), // min_pool_size
"0".to_string(), // reserve_pool
pool_config.pool_mode.to_string(), // pool_mode
pool_config.user.pool_size.to_string(), // max_connections
pool_state.connections.to_string(), // current_connections
address.name(), // name
address.host.to_string(), // host
address.port.to_string(), // port
database_name.to_string(), // database
pool_config.user.username.to_string(), // force_user
pool_config.user.pool_size.to_string(), // pool_size
"0".to_string(), // min_pool_size
"0".to_string(), // reserve_pool
pool_config.pool_mode.to_string(), // pool_mode
pool_config.user.pool_size.to_string(), // max_connections
pool_state.connections.to_string(), // current_connections
match paused {
// paused
true => "1".to_string(),
@@ -690,8 +675,6 @@ where
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
("age_seconds", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
let new_map = get_client_stats();
@@ -699,7 +682,6 @@ where
res.put(row_description(&columns));
for (_, client) in new_map {
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
let row = vec![
format!("{:#010X}", client.client_id()),
client.pool_name(),
@@ -713,8 +695,6 @@ where
.duration_since(client.connect_time())
.as_secs()
.to_string(),
(max_wait / 1_000_000).to_string(),
(max_wait % 1_000_000).to_string(),
];
res.put(data_row(&row));
@@ -747,10 +727,6 @@ where
("bytes_sent", DataType::Numeric),
("bytes_received", DataType::Numeric),
("age_seconds", DataType::Numeric),
("prepare_cache_hit", DataType::Numeric),
("prepare_cache_miss", DataType::Numeric),
("prepare_cache_eviction", DataType::Numeric),
("prepare_cache_size", DataType::Numeric),
];
let new_map = get_server_stats();
@@ -774,22 +750,6 @@ where
.duration_since(server.connect_time())
.as_secs()
.to_string(),
server
.prepared_hit_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_miss_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_eviction_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_cache_size
.load(Ordering::Relaxed)
.to_string(),
];
res.put(data_row(&row));
@@ -806,128 +766,96 @@ where
}
/// Pause a pool. It won't pass any more queries to the backends.
async fn pause<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
async fn pause<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(',').map(|part| part.trim()).collect(),
false => Vec::new(),
};
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
if parts.len() != 2 {
error_response(
stream,
"PAUSE requires a database and a user, e.g. PAUSE my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];
match get_pool(database, user, None) {
Some(pool) => {
pool.pause();
let mut res = BytesMut::new();
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
let mut res = BytesMut::new();
res.put(command_complete("PAUSE"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.pause();
let mut res = BytesMut::new();
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
_ => error_response(stream, "usage: PAUSE [db, user]").await,
}
}
/// Resume a pool. Queries are allowed again.
async fn resume<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
async fn resume<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(',').map(|part| part.trim()).collect(),
false => Vec::new(),
};
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
if parts.len() != 2 {
error_response(
stream,
"RESUME requires a database and a user, e.g. RESUME my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];
match get_pool(database, user, None) {
Some(pool) => {
pool.resume();
let mut res = BytesMut::new();
res.put(command_complete(&format!("RESUME {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
let mut res = BytesMut::new();
res.put(command_complete("RESUME"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];
match get_pool(database, user) {
Some(pool) => {
pool.resume();
let mut res = BytesMut::new();
res.put(command_complete(&format!("RESUME {},{}", database, user)));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
_ => error_response(stream, "usage: RESUME [db, user]").await,
}
}
@@ -969,13 +897,20 @@ where
res.put(row_description(&vec![
("name", DataType::Text),
("pool_mode", DataType::Text),
("secret", DataType::Text),
]));
for (user_pool, pool) in get_all_pools() {
let pool_config = &pool.settings;
let redacted_secret = match user_pool.secret {
Some(secret) => format!("****{}", &secret[secret.len() - 4..]),
None => "<no secret>".to_string(),
};
res.put(data_row(&vec![
user_pool.user.clone(),
pool_config.pool_mode.to_string(),
redacted_secret,
]));
}

452
src/auth.rs Normal file
View File

@@ -0,0 +1,452 @@
//! Module implementing various client authentication mechanisms.
//!
//! Currently supported: plain (via TLS), md5 (via TLS and plain text connection).
use crate::errors::Error;
use crate::tokio::io::AsyncReadExt;
use crate::{
auth_passthrough::AuthPassthrough,
config::get_config,
messages::{
error_response, md5_hash_password, md5_hash_second_pass, write_all, wrong_password,
},
pool::{get_pool, ConnectionPool},
};
use bytes::{BufMut, BytesMut};
use log::debug;
async fn refetch_auth_hash<S>(
pool: &ConnectionPool,
stream: &mut S,
username: &str,
pool_name: &str,
) -> Result<String, Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
let config = get_config();
debug!("Fetching auth hash");
if config.is_auth_query_configured() {
let address = pool.address(0, 0);
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
let hash = apt.fetch_hash(address).await?;
debug!("Auth query succeeded");
return Ok(hash);
}
} else {
debug!("Auth query not configured on pool");
}
error_response(
stream,
&format!(
"No password set and auth passthrough failed for database: {}, user: {}",
pool_name, username
),
)
.await?;
Err(Error::ClientError(format!(
"Could not obtain hash for {{ username: {:?}, database: {:?} }}. Auth passthrough not enabled.",
pool_name, username
)))
}
/// Read 'p' message from client.
async fn response<R>(stream: &mut R) -> Result<Vec<u8>, Error>
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send,
{
let code = match stream.read_u8().await {
Ok(code) => code,
Err(_) => {
return Err(Error::SocketError(
"Error reading password code from client".to_string(),
))
}
};
if code as char != 'p' {
return Err(Error::SocketError(format!("Expected p, got {}", code)));
}
let len = match stream.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::SocketError(
"Error reading password length from client".to_string(),
))
}
};
let mut response = vec![0; (len - 4) as usize];
// Too short to be a password (null-terminated)
if response.len() < 2 {
return Err(Error::ClientError(format!("Password response too short")));
}
match stream.read_exact(&mut response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::SocketError(
"Error reading password from client".to_string(),
))
}
};
Ok(response.to_vec())
}
/// Make sure the pool we authenticated to has at least one server connection
/// that can serve our request.
async fn validate_pool<W>(
stream: &mut W,
mut pool: ConnectionPool,
username: &str,
pool_name: &str,
) -> Result<(), Error>
where
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
if !pool.validated() {
match pool.validate().await {
Ok(_) => Ok(()),
Err(err) => {
error_response(
stream,
&format!("Pool down for database: {}, user: {}", pool_name, username,),
)
.await?;
Err(Error::ClientError(format!("Pool down: {:?}", err)))
}
}
} else {
Ok(())
}
}
/// Clear text authentication.
///
/// The client will send the password in plain text over the wire.
/// To protect against obvious security issues, this is only used over TLS.
///
/// Clear text authentication is used to support zero-downtime password rotation.
/// It allows the client to use multiple passwords when talking to the PgCat
/// while the password is being rotated across multiple app instances.
pub struct ClearText {
username: String,
pool_name: String,
application_name: String,
}
impl ClearText {
/// Create a new ClearText authentication mechanism.
pub fn new(username: &str, pool_name: &str, application_name: &str) -> ClearText {
ClearText {
username: username.to_string(),
pool_name: pool_name.to_string(),
application_name: application_name.to_string(),
}
}
/// Issue 'R' clear text challenge to client.
pub async fn challenge<W>(&self, stream: &mut W) -> Result<(), Error>
where
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
debug!("Sending plain challenge");
let mut msg = BytesMut::new();
msg.put_u8(b'R');
msg.put_i32(8);
msg.put_i32(3); // Clear text
write_all(stream, msg).await
}
/// Authenticate client with server password or secret.
pub async fn authenticate<R, W>(
&self,
read: &mut R,
write: &mut W,
) -> Result<Option<String>, Error>
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
let response = response(read).await?;
let secret = String::from_utf8_lossy(&response[0..response.len() - 1]).to_string();
match get_pool(&self.pool_name, &self.username, Some(secret.clone())) {
None => match get_pool(&self.pool_name, &self.username, None) {
Some(pool) => {
match pool.settings.user.password {
Some(ref password) => {
if password != &secret {
wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!(
"Invalid password {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
} else {
validate_pool(write, pool, &self.username, &self.pool_name).await?;
Ok(None)
}
}
None => {
// Server is storing hashes, we can't query it for the plain text password.
error_response(
write,
&format!(
"No server password configured for database: {}, user: {}",
self.pool_name, self.username
),
)
.await?;
Err(Error::ClientError(format!(
"No server password configured for {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
}
}
}
None => {
error_response(
write,
&format!(
"No pool configured for database: {}, user: {}",
self.pool_name, self.username
),
)
.await?;
Err(Error::ClientError(format!(
"Invalid pool name {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
}
},
Some(pool) => {
validate_pool(write, pool, &self.username, &self.pool_name).await?;
Ok(Some(secret))
}
}
}
}
/// MD5 hash authentication.
///
/// Deprecated, but widely used everywhere, and currently required for poolers
/// to authencticate clients without involving Postgres.
///
/// Admin clients are required to use MD5.
pub struct Md5 {
username: String,
pool_name: String,
application_name: String,
salt: [u8; 4],
admin: bool,
}
impl Md5 {
pub fn new(username: &str, pool_name: &str, application_name: &str, admin: bool) -> Md5 {
let salt: [u8; 4] = [
rand::random(),
rand::random(),
rand::random(),
rand::random(),
];
Md5 {
username: username.to_string(),
pool_name: pool_name.to_string(),
application_name: application_name.to_string(),
salt,
admin,
}
}
/// Issue a 'R' MD5 challenge to the client.
pub async fn challenge<W>(&self, stream: &mut W) -> Result<(), Error>
where
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
let mut res = BytesMut::new();
res.put_u8(b'R');
res.put_i32(12);
res.put_i32(5); // MD5
res.put_slice(&self.salt[..]);
write_all(stream, res).await
}
/// Authenticate client with MD5. This is used for both admin and normal users.
pub async fn authenticate<R, W>(&self, read: &mut R, write: &mut W) -> Result<(), Error>
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send,
{
let password_hash = response(read).await?;
if self.admin {
let config = get_config();
// Compare server and client hashes.
let our_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&self.salt,
);
if our_hash != password_hash {
wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!(
"Invalid password {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
} else {
Ok(())
}
} else {
match get_pool(&self.pool_name, &self.username, None) {
Some(pool) => {
match &pool.settings.user.password {
Some(ref password) => {
let our_hash = md5_hash_password(&self.username, password, &self.salt);
if our_hash != password_hash {
wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!(
"Invalid password {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
} else {
validate_pool(write, pool, &self.username, &self.pool_name).await?;
Ok(())
}
}
None => {
if !get_config().is_auth_query_configured() {
error_response(
write,
&format!(
"No password configured and auth_query is not set: {}, user: {}",
self.pool_name, self.username
),
)
.await?;
return Err(Error::ClientError(format!(
"No password configured and auth_query is not set"
)));
}
debug!("Using auth_query");
// Fetch hash from server
let hash = (*pool.auth_hash.read()).clone();
let hash = match hash {
Some(hash) => {
debug!("Using existing hash: {}", hash);
hash.clone()
}
None => {
debug!("Pool has no hash set, fetching new one");
let hash = refetch_auth_hash(
&pool,
write,
&self.username,
&self.pool_name,
)
.await?;
(*pool.auth_hash.write()) = Some(hash.clone());
hash
}
};
let our_hash = md5_hash_second_pass(&hash, &self.salt);
// Compare hashes
if our_hash != password_hash {
debug!("Pool auth query hash did not match, refetching");
// Server hash maybe changed
let hash = refetch_auth_hash(
&pool,
write,
&self.username,
&self.pool_name,
)
.await?;
let our_hash = md5_hash_second_pass(&hash, &self.salt);
if our_hash != password_hash {
debug!("Auth query failed, passwords don't match");
wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!(
"Invalid password {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)))
} else {
(*pool.auth_hash.write()) = Some(hash);
validate_pool(
write,
pool.clone(),
&self.username,
&self.pool_name,
)
.await?;
Ok(())
}
} else {
validate_pool(write, pool.clone(), &self.username, &self.pool_name)
.await?;
Ok(())
}
}
}
}
None => {
error_response(
write,
&format!(
"No pool configured for database: {}, user: {}",
self.pool_name, self.username
),
)
.await?;
return Err(Error::ClientError(format!(
"Invalid pool name {{ username: {}, pool_name: {}, application_name: {} }}",
self.username, self.pool_name, self.application_name
)));
}
}
}
}
}

View File

@@ -1,5 +1,4 @@
use crate::errors::Error;
use crate::pool::ConnectionPool;
use crate::server::Server;
use log::debug;
@@ -72,34 +71,23 @@ impl AuthPassthrough {
let auth_user = crate::config::User {
username: self.user.clone(),
password: Some(self.password.clone()),
server_username: None,
server_password: None,
pool_size: 1,
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
min_pool_size: None,
connect_timeout: None,
idle_timeout: None,
secrets: None,
};
let user = &address.username;
debug!("Connecting to server to obtain auth hashes");
debug!("Connecting to server to obtain auth hashes.");
let auth_query = self.query.replace("$1", user);
match Server::exec_simple_query(address, &auth_user, &auth_query).await {
Ok(password_data) => {
if password_data.len() == 2 && password_data.first().unwrap() == user {
if let Some(stripped_hash) = password_data
.last()
.unwrap()
.to_string()
.strip_prefix("md5") {
Ok(stripped_hash.to_string())
}
else {
if let Some(stripped_hash) = password_data.last().unwrap().to_string().strip_prefix("md5") {
Ok(stripped_hash.to_string())
} else {
Err(Error::AuthPassthroughError(
"Obtained hash from auth_query does not seem to be in md5 format.".to_string(),
))
@@ -111,26 +99,12 @@ impl AuthPassthrough {
))
}
}
Err(err) => {
Err(Error::AuthPassthroughError(
format!("Error trying to obtain password from auth_query, ignoring hash for user '{}'. Error: {:?}",
user, err))
)
user, err)))
}
}
}
}
}
pub async fn refetch_auth_hash(pool: &ConnectionPool) -> Result<String, Error> {
let address = pool.address(0, 0);
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
let hash = apt.fetch_hash(address).await?;
return Ok(hash);
}
Err(Error::ClientError(format!(
"Could not obtain hash for {{ username: {:?}, database: {:?} }}. Auth passthrough not enabled.",
address.username, address.database
)))
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,36 +0,0 @@
use clap::{Parser, ValueEnum};
use tracing::Level;
/// PgCat: Nextgen PostgreSQL Pooler
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(default_value_t = String::from("pgcat.toml"), env)]
pub config_file: String,
#[arg(short, long, default_value_t = tracing::Level::INFO, env)]
pub log_level: Level,
#[clap(short='F', long, value_enum, default_value_t=LogFormat::Text, env)]
pub log_format: LogFormat,
#[arg(
short,
long,
default_value_t = false,
env,
help = "disable colors in the log output"
)]
pub no_color: bool,
}
pub fn parse() -> Args {
Args::parse()
}
#[derive(ValueEnum, Clone, Debug)]
pub enum LogFormat {
Text,
Structured,
Debug,
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,410 +0,0 @@
use crate::config::get_config;
use crate::errors::Error;
use arc_swap::ArcSwap;
use log::{debug, error, info, warn};
use once_cell::sync::Lazy;
use std::collections::{HashMap, HashSet};
use std::io;
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::time::{sleep, Duration};
use trust_dns_resolver::error::{ResolveError, ResolveResult};
use trust_dns_resolver::lookup_ip::LookupIp;
use trust_dns_resolver::TokioAsyncResolver;
/// Cached Resolver Globally available
pub static CACHED_RESOLVER: Lazy<ArcSwap<CachedResolver>> =
Lazy::new(|| ArcSwap::from_pointee(CachedResolver::default()));
// Ip addressed are returned as a set of addresses
// so we can compare.
#[derive(Clone, PartialEq, Debug)]
pub struct AddrSet {
set: HashSet<IpAddr>,
}
impl AddrSet {
fn new() -> AddrSet {
AddrSet {
set: HashSet::new(),
}
}
}
impl From<LookupIp> for AddrSet {
fn from(lookup_ip: LookupIp) -> Self {
let mut addr_set = AddrSet::new();
for address in lookup_ip.iter() {
addr_set.set.insert(address);
}
addr_set
}
}
///
/// A CachedResolver is a DNS resolution cache mechanism with customizable expiration time.
///
/// The system works as follows:
///
/// When a host is to be resolved, if we have not resolved it before, a new resolution is
/// executed and stored in the internal cache. Concurrently, every `dns_max_ttl` time, the
/// cache is refreshed.
///
/// # Example:
///
/// ```
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
///
/// # tokio_test::block_on(async {
/// let config = CachedResolverConfig::default();
/// let resolver = CachedResolver::new(config, None).await.unwrap();
/// let addrset = resolver.lookup_ip("www.example.com.").await.unwrap();
/// # })
/// ```
///
/// // Now the ip resolution is stored in local cache and subsequent
/// // calls will be returned from cache. Also, the cache is refreshed
/// // and updated every 10 seconds.
///
/// // You can now check if an 'old' lookup differs from what it's currently
/// // store in cache by using `has_changed`.
/// resolver.has_changed("www.example.com.", addrset)
#[derive(Default)]
pub struct CachedResolver {
// The configuration of the cached_resolver.
config: CachedResolverConfig,
// This is the hash that contains the hash.
data: Option<RwLock<HashMap<String, AddrSet>>>,
// The resolver to be used for DNS queries.
resolver: Option<TokioAsyncResolver>,
// The RefreshLoop
refresh_loop: RwLock<Option<tokio::task::JoinHandle<()>>>,
}
///
/// Configuration
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CachedResolverConfig {
/// Amount of time in secods that a resolved dns address is considered stale.
dns_max_ttl: u64,
/// Enabled or disabled? (this is so we can reload config)
enabled: bool,
}
impl CachedResolverConfig {
fn new(dns_max_ttl: u64, enabled: bool) -> Self {
CachedResolverConfig {
dns_max_ttl,
enabled,
}
}
}
impl From<crate::config::Config> for CachedResolverConfig {
fn from(config: crate::config::Config) -> Self {
CachedResolverConfig::new(config.general.dns_max_ttl, config.general.dns_cache_enabled)
}
}
impl CachedResolver {
///
/// Returns a new Arc<CachedResolver> based on passed configuration.
/// It also starts the loop that will refresh cache entries.
///
/// # Arguments:
///
/// * `config` - The `CachedResolverConfig` to be used to create the resolver.
///
/// # Example:
///
/// ```
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
///
/// # tokio_test::block_on(async {
/// let config = CachedResolverConfig::default();
/// let resolver = CachedResolver::new(config, None).await.unwrap();
/// # })
/// ```
///
pub async fn new(
config: CachedResolverConfig,
data: Option<HashMap<String, AddrSet>>,
) -> Result<Arc<Self>, io::Error> {
// Construct a new Resolver with default configuration options
let resolver = Some(TokioAsyncResolver::tokio_from_system_conf()?);
let data = if let Some(hash) = data {
Some(RwLock::new(hash))
} else {
Some(RwLock::new(HashMap::new()))
};
let instance = Arc::new(Self {
config,
resolver,
data,
refresh_loop: RwLock::new(None),
});
if instance.enabled() {
info!("Scheduling DNS refresh loop");
let refresh_loop = tokio::task::spawn({
let instance = instance.clone();
async move {
instance.refresh_dns_entries_loop().await;
}
});
*(instance.refresh_loop.write().unwrap()) = Some(refresh_loop);
}
Ok(instance)
}
pub fn enabled(&self) -> bool {
self.config.enabled
}
// Schedules the refresher
async fn refresh_dns_entries_loop(&self) {
let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap();
let interval = Duration::from_secs(self.config.dns_max_ttl);
loop {
debug!("Begin refreshing cached DNS addresses.");
// To minimize the time we hold the lock, we first create
// an array with keys.
let mut hostnames: Vec<String> = Vec::new();
{
if let Some(ref data) = self.data {
for hostname in data.read().unwrap().keys() {
hostnames.push(hostname.clone());
}
}
}
for hostname in hostnames.iter() {
let addrset = self
.fetch_from_cache(hostname.as_str())
.expect("Could not obtain expected address from cache, this should not happen");
match resolver.lookup_ip(hostname).await {
Ok(lookup_ip) => {
let new_addrset = AddrSet::from(lookup_ip);
debug!(
"Obtained address for host ({}) -> ({:?})",
hostname, new_addrset
);
if addrset != new_addrset {
debug!(
"Addr changed from {:?} to {:?} updating cache.",
addrset, new_addrset
);
self.store_in_cache(hostname, new_addrset);
}
}
Err(err) => {
error!(
"There was an error trying to resolv {}: ({}).",
hostname, err
);
}
}
}
debug!("Finished refreshing cached DNS addresses.");
sleep(interval).await;
}
}
/// Returns a `AddrSet` given the specified hostname.
///
/// This method first tries to fetch the value from the cache, if it misses
/// then it is resolved and stored in the cache. TTL from records is ignored.
///
/// # Arguments
///
/// * `host` - A string slice referencing the hostname to be resolved.
///
/// # Example:
///
/// ```
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
///
/// # tokio_test::block_on(async {
/// let config = CachedResolverConfig::default();
/// let resolver = CachedResolver::new(config, None).await.unwrap();
/// let response = resolver.lookup_ip("www.google.com.");
/// # })
/// ```
///
pub async fn lookup_ip(&self, host: &str) -> ResolveResult<AddrSet> {
debug!("Lookup up {} in cache", host);
match self.fetch_from_cache(host) {
Some(addr_set) => {
debug!("Cache hit!");
Ok(addr_set)
}
None => {
debug!("Not found, executing a dns query!");
if let Some(ref resolver) = self.resolver {
let addr_set = AddrSet::from(resolver.lookup_ip(host).await?);
debug!("Obtained: {:?}", addr_set);
self.store_in_cache(host, addr_set.clone());
Ok(addr_set)
} else {
Err(ResolveError::from("No resolver available"))
}
}
}
}
//
// Returns true if the stored host resolution differs from the AddrSet passed.
pub fn has_changed(&self, host: &str, addr_set: &AddrSet) -> bool {
if let Some(fetched_addr_set) = self.fetch_from_cache(host) {
return fetched_addr_set != *addr_set;
}
false
}
// Fetches an AddrSet from the inner cache adquiring the read lock.
fn fetch_from_cache(&self, key: &str) -> Option<AddrSet> {
if let Some(ref hash) = self.data {
if let Some(addr_set) = hash.read().unwrap().get(key) {
return Some(addr_set.clone());
}
}
None
}
// Sets up the global CACHED_RESOLVER static variable so we can globally use DNS
// cache.
pub async fn from_config() -> Result<(), Error> {
let cached_resolver = CACHED_RESOLVER.load();
let desired_config = CachedResolverConfig::from(get_config());
if cached_resolver.config != desired_config {
if let Some(ref refresh_loop) = *(cached_resolver.refresh_loop.write().unwrap()) {
warn!("Killing Dnscache refresh loop as its configuration is being reloaded");
refresh_loop.abort()
}
let new_resolver = if let Some(ref data) = cached_resolver.data {
let data = Some(data.read().unwrap().clone());
CachedResolver::new(desired_config, data).await
} else {
CachedResolver::new(desired_config, None).await
};
match new_resolver {
Ok(ok) => {
CACHED_RESOLVER.store(ok);
Ok(())
}
Err(err) => {
let message = format!("Error setting up cached_resolver. Error: {:?}, will continue without this feature.", err);
Err(Error::DNSCachedError(message))
}
}
} else {
Ok(())
}
}
// Stores the AddrSet in cache adquiring the write lock.
fn store_in_cache(&self, host: &str, addr_set: AddrSet) {
if let Some(ref data) = self.data {
data.write().unwrap().insert(host.to_string(), addr_set);
} else {
error!("Could not insert, Hash not initialized");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use trust_dns_resolver::error::ResolveError;
#[tokio::test]
async fn new() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await;
assert!(resolver.is_ok());
}
#[tokio::test]
async fn lookup_ip() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await.unwrap();
let response = resolver.lookup_ip("www.google.com.").await;
assert!(response.is_ok());
}
#[tokio::test]
async fn has_changed() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await.unwrap();
let hostname = "www.google.com.";
let response = resolver.lookup_ip(hostname).await;
let addr_set = response.unwrap();
assert!(!resolver.has_changed(hostname, &addr_set));
}
#[tokio::test]
async fn unknown_host() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await.unwrap();
let hostname = "www.idontexists.";
let response = resolver.lookup_ip(hostname).await;
assert!(matches!(response, Err(ResolveError { .. })));
}
#[tokio::test]
async fn incorrect_address() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await.unwrap();
let hostname = "w ww.idontexists.";
let response = resolver.lookup_ip(hostname).await;
assert!(matches!(response, Err(ResolveError { .. })));
assert!(!resolver.has_changed(hostname, &AddrSet::new()));
}
#[tokio::test]
// Ok, this test is based on the fact that google does DNS RR
// and does not responds with every available ip everytime, so
// if I cache here, it will miss after one cache iteration or two.
async fn thread() {
let config = CachedResolverConfig {
dns_max_ttl: 10,
enabled: true,
};
let resolver = CachedResolver::new(config, None).await.unwrap();
let hostname = "www.google.com.";
let response = resolver.lookup_ip(hostname).await;
let addr_set = response.unwrap();
assert!(!resolver.has_changed(hostname, &addr_set));
let resolver_for_refresher = resolver.clone();
let _thread_handle = tokio::task::spawn(async move {
resolver_for_refresher.refresh_dns_entries_loop().await;
});
assert!(!resolver.has_changed(hostname, &addr_set));
}
}

View File

@@ -1,133 +1,20 @@
//! Errors.
/// Errors.
/// Various errors.
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub enum Error {
SocketError(String),
ClientSocketError(String, ClientIdentifier),
ClientGeneralError(String, ClientIdentifier),
ClientAuthImpossible(String),
ClientAuthPassthroughError(String, ClientIdentifier),
ClientBadStartup,
ProtocolSyncError(String),
BadQuery(String),
ServerError,
ServerMessageParserError(String),
ServerStartupError(String, ServerIdentifier),
ServerAuthError(String, ServerIdentifier),
BadConfig,
AllServersDown,
ClientError(String),
TlsError,
StatementTimeout,
DNSCachedError(String),
ShuttingDown,
ParseBytesError(String),
AuthError(String),
AuthPassthroughError(String),
UnsupportedStatement,
QueryRouterParserError(String),
QueryRouterError(String),
InvalidShardId(usize),
PreparedStatementError,
}
#[derive(Clone, PartialEq, Debug)]
pub struct ClientIdentifier {
pub application_name: String,
pub username: String,
pub pool_name: String,
}
impl ClientIdentifier {
pub fn new(application_name: &str, username: &str, pool_name: &str) -> ClientIdentifier {
ClientIdentifier {
application_name: application_name.into(),
username: username.into(),
pool_name: pool_name.into(),
}
}
}
impl std::fmt::Display for ClientIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{{ application_name: {}, username: {}, pool_name: {} }}",
self.application_name, self.username, self.pool_name
)
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ServerIdentifier {
pub username: String,
pub database: String,
}
impl ServerIdentifier {
pub fn new(username: &str, database: &str) -> ServerIdentifier {
ServerIdentifier {
username: username.into(),
database: database.into(),
}
}
}
impl std::fmt::Display for ServerIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{{ username: {}, database: {} }}",
self.username, self.database
)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match &self {
&Error::ClientSocketError(error, client_identifier) => write!(
f,
"Error reading {} from client {}",
error, client_identifier
),
&Error::ClientGeneralError(error, client_identifier) => {
write!(f, "{} {}", error, client_identifier)
}
&Error::ClientAuthImpossible(username) => write!(
f,
"Client auth not possible, \
no cleartext password set for username: {} \
in config and auth passthrough (query_auth) \
is not set up.",
username
),
&Error::ClientAuthPassthroughError(error, client_identifier) => write!(
f,
"No cleartext password set, \
and no auth passthrough could not \
obtain the hash from server for {}, \
the error was: {}",
client_identifier, error
),
&Error::ServerStartupError(error, server_identifier) => write!(
f,
"Error reading {} on server startup {}",
error, server_identifier,
),
&Error::ServerAuthError(error, server_identifier) => {
write!(f, "{} for {}", error, server_identifier,)
}
// The rest can use Debug.
err => write!(f, "{:?}", err),
}
}
}
impl From<std::ffi::NulError> for Error {
fn from(err: std::ffi::NulError) -> Self {
Error::QueryRouterError(err.to_string())
}
}

View File

@@ -1,18 +1,11 @@
pub mod admin;
pub mod auth_passthrough;
pub mod client;
pub mod cmd_args;
pub mod config;
pub mod constants;
pub mod dns_cache;
pub mod errors;
pub mod logger;
pub mod messages;
pub mod mirrors;
pub mod plugins;
pub mod multi_logger;
pub mod pool;
pub mod prometheus;
pub mod query_router;
pub mod scram;
pub mod server;
pub mod sharding;

View File

@@ -1,20 +0,0 @@
use crate::cmd_args::{Args, LogFormat};
use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
let trace_sub = tracing_subscriber::fmt()
.with_thread_ids(true)
.with_env_filter(filter)
.with_ansi(!args.no_color);
match args.log_format {
LogFormat::Structured => trace_sub.json().init(),
LogFormat::Debug => trace_sub.pretty().init(),
_ => trace_sub.init(),
};
}

View File

@@ -23,6 +23,7 @@ extern crate arc_swap;
extern crate async_trait;
extern crate bb8;
extern crate bytes;
extern crate env_logger;
extern crate exitcode;
extern crate log;
extern crate md5;
@@ -35,7 +36,6 @@ extern crate sqlparser;
extern crate tokio;
extern crate tokio_rustls;
extern crate toml;
extern crate trust_dns_resolver;
#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;
@@ -60,32 +60,54 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::broadcast;
use pgcat::cmd_args;
use pgcat::config::{get_config, reload_config, VERSION};
use pgcat::dns_cache;
use pgcat::logger;
use pgcat::messages::configure_socket;
use pgcat::pool::{ClientServerMap, ConnectionPool};
use pgcat::prometheus::start_metric_server;
use pgcat::stats::{Collector, Reporter, REPORTER};
mod admin;
mod auth;
mod auth_passthrough;
mod client;
mod config;
mod constants;
mod errors;
mod messages;
mod mirrors;
mod multi_logger;
mod pool;
mod prometheus;
mod query_router;
mod scram;
mod server;
mod sharding;
mod stats;
mod tls;
use crate::config::{get_config, reload_config, VERSION};
use crate::pool::{ClientServerMap, ConnectionPool};
use crate::prometheus::start_metric_server;
use crate::stats::{Collector, Reporter, REPORTER};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = cmd_args::parse();
logger::init(&args);
multi_logger::MultiLogger::init().unwrap();
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
if !pgcat::query_router::QueryRouter::setup() {
if !query_router::QueryRouter::setup() {
error!("Could not setup query router");
std::process::exit(exitcode::CONFIG);
}
let args = std::env::args().collect::<Vec<String>>();
let config_file = if args.len() == 2 {
args[1].to_string()
} else {
String::from("pgcat.toml")
};
// Create a transient runtime for loading the config for the first time.
{
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
runtime.block_on(async {
match pgcat::config::parse(args.config_file.as_str()).await {
match config::parse(&config_file).await {
Ok(_) => (),
Err(err) => {
error!("Config parse error: {:?}", err);
@@ -144,12 +166,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Statistics reporting.
REPORTER.store(Arc::new(Reporter::default()));
// Starts (if enabled) dns cache before pools initialization
match dns_cache::CachedResolver::from_config().await {
Ok(_) => (),
Err(err) => error!("DNS cache initialization error: {:?}", err),
};
// Connection pool that allows to query all shards and replicas.
match ConnectionPool::from_config(client_server_map.clone()).await {
Ok(_) => (),
@@ -164,19 +180,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
stats_collector.collect().await;
});
info!("Config autoreloader: {}", match config.general.autoreload {
Some(interval) => format!("{} ms", interval),
None => "disabled".into(),
});
info!("Config autoreloader: {}", config.general.autoreload);
if let Some(interval) = config.general.autoreload {
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(interval));
let autoreload_client_server_map = client_server_map.clone();
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
let autoreload_client_server_map = client_server_map.clone();
tokio::task::spawn(async move {
loop {
autoreload_interval.tick().await;
debug!("Automatically reloading config");
tokio::task::spawn(async move {
loop {
autoreload_interval.tick().await;
if config.general.autoreload {
info!("Automatically reloading config");
if let Ok(changed) = reload_config(autoreload_client_server_map.clone()).await {
if changed {
@@ -184,10 +197,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};
}
});
};
}
});
#[cfg(windows)]
let mut term_signal = win_signal::ctrl_close().unwrap();
@@ -272,20 +283,18 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let drain_tx = drain_tx.clone();
let client_server_map = client_server_map.clone();
let tls_certificate = get_config().general.tls_certificate.clone();
configure_socket(&socket);
let tls_certificate = config.general.tls_certificate.clone();
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();
match pgcat::client::client_entrypoint(
match client::client_entrypoint(
socket,
client_server_map,
shutdown_rx,
drain_tx,
admin_only,
tls_certificate,
tls_certificate.clone(),
config.general.log_client_connections,
)
.await
@@ -293,7 +302,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;
if get_config().general.log_client_disconnections {
if config.general.log_client_disconnections {
info!(
"Client {:?} disconnected, session duration: {}",
addr,
@@ -310,7 +319,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Err(err) => {
match err {
pgcat::errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
_ => warn!("Client disconnected with error {:?}", err),
}

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,8 @@ use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
use crate::config::{get_config, Address, Role, User};
use crate::pool::{ClientServerMap, ServerPool};
use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool};
use crate::stats::PoolStats;
use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -23,27 +24,25 @@ impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout, _cfg, prepared_statement_cache_size) =
let (connection_timeout, idle_timeout, cfg) =
match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
cfg.idle_timeout.unwrap_or(default),
cfg.clone(),
cfg.prepared_statements_cache_size,
),
None => (default, default, crate::config::Pool::default(), 0),
None => (default, default, crate::config::Pool::default()),
};
let identifier = PoolIdentifier::new(&self.database, &self.user.username, None);
let manager = ServerPool::new(
self.address.clone(),
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
true,
false,
prepared_statement_cache_size,
);
Pool::builder()
@@ -81,13 +80,12 @@ impl MirroredClient {
}
// Incoming data from server (we read to clear the socket buffer and discard the data)
recv_result = server.recv(None) => {
recv_result = server.recv() => {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
}
}
}
@@ -99,9 +97,8 @@ impl MirroredClient {
match server.send(&BytesMut::from(&bytes[..])).await {
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to receive from mirror {:?} {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
}
}
}
@@ -141,18 +138,18 @@ impl MirroringManager {
bytes_rx,
disconnect_rx: exit_rx,
};
exit_senders.push(exit_tx);
byte_senders.push(bytes_tx);
exit_senders.push(exit_tx.clone());
byte_senders.push(bytes_tx.clone());
client.start();
});
Self {
byte_senders,
byte_senders: byte_senders,
disconnect_senders: exit_senders,
}
}
pub fn send(&mut self, bytes: &BytesMut) {
pub fn send(self: &mut Self, bytes: &BytesMut) {
// We want to avoid performing an allocation if we won't be able to send the message
// There is a possibility of a race here where we check the capacity and then the channel is
// closed or the capacity is reduced to 0, but mirroring is best effort anyway
@@ -174,7 +171,7 @@ impl MirroringManager {
});
}
pub fn disconnect(&mut self) {
pub fn disconnect(self: &mut Self) {
self.disconnect_senders
.iter_mut()
.for_each(|sender| match sender.try_send(()) {

80
src/multi_logger.rs Normal file
View File

@@ -0,0 +1,80 @@
use log::{Level, Log, Metadata, Record, SetLoggerError};
// This is a special kind of logger that allows sending logs to different
// targets depending on the log level.
//
// By default, if nothing is set, it acts as a regular env_log logger,
// it sends everything to standard error.
//
// If the Env variable `STDOUT_LOG` is defined, it will be used for
// configuring the standard out logger.
//
// The behavior is:
// - If it is an error, the message is written to standard error.
// - If it is not, and it matches the log level of the standard output logger (`STDOUT_LOG` env var), it will be send to standard output.
// - If the above is not true, it is sent to the stderr logger that will log it or not depending on the value
// of the RUST_LOG env var.
//
// So to summarize, if no `STDOUT_LOG` env var is present, the logger is the default logger. If `STDOUT_LOG` is set, everything
// but errors, that matches the log level set in the `STDOUT_LOG` env var is sent to stdout. You can have also some esoteric configuration
// where you set `RUST_LOG=debug` and `STDOUT_LOG=info`, in here, erros will go to stderr, warns and infos to stdout and debugs to stderr.
//
pub struct MultiLogger {
stderr_logger: env_logger::Logger,
stdout_logger: env_logger::Logger,
}
impl MultiLogger {
fn new() -> Self {
let stderr_logger = env_logger::builder().format_timestamp_micros().build();
let stdout_logger = env_logger::Builder::from_env("STDOUT_LOG")
.format_timestamp_micros()
.target(env_logger::Target::Stdout)
.build();
Self {
stderr_logger,
stdout_logger,
}
}
pub fn init() -> Result<(), SetLoggerError> {
let logger = Self::new();
log::set_max_level(logger.stderr_logger.filter());
log::set_boxed_logger(Box::new(logger))
}
}
impl Log for MultiLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
self.stderr_logger.enabled(metadata) && self.stdout_logger.enabled(metadata)
}
fn log(&self, record: &Record) {
if record.level() == Level::Error {
self.stderr_logger.log(record);
} else {
if self.stdout_logger.matches(record) {
self.stdout_logger.log(record);
} else {
self.stderr_logger.log(record);
}
}
}
fn flush(&self) {
self.stderr_logger.flush();
self.stdout_logger.flush();
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_init() {
MultiLogger::init().unwrap();
}
}

View File

@@ -1,120 +0,0 @@
//! The intercept plugin.
//!
//! It intercepts queries and returns fake results.
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use sqlparser::ast::Statement;
use log::debug;
use crate::{
config::Intercept as InterceptConfig,
errors::Error,
messages::{command_complete, data_row_nullable, row_description, DataType},
plugins::{Plugin, PluginOutput},
query_router::QueryRouter,
};
// TODO: use these structs for deserialization
#[derive(Serialize, Deserialize)]
pub struct Rule {
query: String,
schema: Vec<Column>,
result: Vec<Vec<String>>,
}
#[derive(Serialize, Deserialize)]
pub struct Column {
name: String,
data_type: String,
}
/// The intercept plugin.
pub struct Intercept<'a> {
pub enabled: bool,
pub config: &'a InterceptConfig,
}
#[async_trait]
impl<'a> Plugin for Intercept<'a> {
async fn run(
&mut self,
query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if !self.enabled || ast.is_empty() {
return Ok(PluginOutput::Allow);
}
let mut config = self.config.clone();
config.substitute(
&query_router.pool_settings().db,
&query_router.pool_settings().user.username,
);
let mut result = BytesMut::new();
for q in ast {
// Normalization
let q = q.to_string().to_ascii_lowercase();
for (_, target) in config.queries.iter() {
if target.query.as_str() == q {
debug!("Intercepting query: {}", q);
let rd = target
.schema
.iter()
.map(|row| {
let name = &row[0];
let data_type = &row[1];
(
name.as_str(),
match data_type.as_str() {
"text" => DataType::Text,
"anyarray" => DataType::AnyArray,
"oid" => DataType::Oid,
"bool" => DataType::Bool,
"int4" => DataType::Int4,
_ => DataType::Any,
},
)
})
.collect::<Vec<(&str, DataType)>>();
result.put(row_description(&rd));
target.result.iter().for_each(|row| {
let row = row
.iter()
.map(|s| {
let s = s.as_str().to_string();
if s.is_empty() {
None
} else {
Some(s)
}
})
.collect::<Vec<Option<String>>>();
result.put(data_row_nullable(&row));
});
result.put(command_complete("SELECT"));
}
}
}
if !result.is_empty() {
result.put_u8(b'Z');
result.put_i32(5);
result.put_u8(b'I');
return Ok(PluginOutput::Intercept(result));
} else {
Ok(PluginOutput::Allow)
}
}
}

View File

@@ -1,45 +0,0 @@
//! The plugin ecosystem.
//!
//! Currently plugins only grant access or deny access to the database for a particual query.
//! Example use cases:
//! - block known bad queries
//! - block access to system catalogs
//! - block dangerous modifications like `DROP TABLE`
//! - etc
//!
pub mod intercept;
pub mod prewarmer;
pub mod query_logger;
pub mod table_access;
use crate::{errors::Error, query_router::QueryRouter};
use async_trait::async_trait;
use bytes::BytesMut;
use sqlparser::ast::Statement;
pub use intercept::Intercept;
pub use query_logger::QueryLogger;
pub use table_access::TableAccess;
#[derive(Clone, Debug, PartialEq)]
pub enum PluginOutput {
Allow,
Deny(String),
Overwrite(Vec<Statement>),
Intercept(BytesMut),
}
#[async_trait]
pub trait Plugin {
// Run before the query is sent to the server.
#[allow(clippy::ptr_arg)]
async fn run(
&mut self,
query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error>;
// TODO: run after the result is returned
// async fn callback(&mut self, query_router: &QueryRouter);
}

View File

@@ -1,28 +0,0 @@
//! Prewarm new connections before giving them to the client.
use crate::{errors::Error, server::Server};
use log::info;
pub struct Prewarmer<'a> {
pub enabled: bool,
pub server: &'a mut Server,
pub queries: &'a Vec<String>,
}
impl<'a> Prewarmer<'a> {
pub async fn run(&mut self) -> Result<(), Error> {
if !self.enabled {
return Ok(());
}
for query in self.queries {
info!(
"{} Prewarning with query: `{}`",
self.server.address(),
query
);
self.server.query(query).await?;
}
Ok(())
}
}

View File

@@ -1,38 +0,0 @@
//! Log all queries to stdout (or somewhere else, why not).
use crate::{
errors::Error,
plugins::{Plugin, PluginOutput},
query_router::QueryRouter,
};
use async_trait::async_trait;
use log::info;
use sqlparser::ast::Statement;
pub struct QueryLogger<'a> {
pub enabled: bool,
pub user: &'a str,
pub db: &'a str,
}
#[async_trait]
impl<'a> Plugin for QueryLogger<'a> {
async fn run(
&mut self,
_query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if !self.enabled {
return Ok(PluginOutput::Allow);
}
let query = ast
.iter()
.map(|q| q.to_string())
.collect::<Vec<String>>()
.join("; ");
info!("[pool: {}][user: {}] {}", self.db, self.user, query);
Ok(PluginOutput::Allow)
}
}

View File

@@ -1,59 +0,0 @@
//! This query router plugin will check if the user can access a particular
//! table as part of their query. If they can't, the query will not be routed.
use async_trait::async_trait;
use sqlparser::ast::{visit_relations, Statement};
use crate::{
errors::Error,
plugins::{Plugin, PluginOutput},
query_router::QueryRouter,
};
use log::debug;
use core::ops::ControlFlow;
pub struct TableAccess<'a> {
pub enabled: bool,
pub tables: &'a Vec<String>,
}
#[async_trait]
impl<'a> Plugin for TableAccess<'a> {
async fn run(
&mut self,
_query_router: &QueryRouter,
ast: &Vec<Statement>,
) -> Result<PluginOutput, Error> {
if !self.enabled {
return Ok(PluginOutput::Allow);
}
let mut found = None;
visit_relations(ast, |relation| {
let relation = relation.to_string();
let parts = relation.split('.').collect::<Vec<&str>>();
let table_name = parts.last().unwrap();
if self.tables.contains(&table_name.to_string()) {
found = Some(table_name.to_string());
ControlFlow::<()>::Break(())
} else {
ControlFlow::<()>::Continue(())
}
});
if let Some(found) = found {
debug!("Blocking access to table \"{}\"", found);
Ok(PluginOutput::Deny(format!(
"permission for table \"{}\" denied",
found
)))
} else {
Ok(PluginOutput::Allow)
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use log::{error, info, warn};
use phf::phf_map;
use std::collections::HashMap;
use std::fmt;
@@ -10,8 +10,7 @@ use std::sync::Arc;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
use crate::stats::{get_pool_stats, get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
@@ -234,10 +233,10 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("stats_{}", name), value, labels)
}
fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
fn from_pool(pool: &PoolIdentifier, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new();
labels.insert("pool", pool_id.db);
labels.insert("user", pool_id.user);
labels.insert("pool", pool.db.clone());
labels.insert("user", pool.user.clone());
Self::from_name(&format!("pools_{}", name), value, labels)
}
@@ -275,7 +274,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
@@ -285,15 +284,18 @@ fn push_address_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
let pool_stats = get_pool_stats();
for (pool, stats) in pool_stats.iter() {
let stats = &**stats;
for (name, value) in stats.clone() {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
if let Some(prometheus_metric) = PrometheusMetric::<u64>::from_pool(pool, &name, value)
{
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for ({})", name, *pool_id);
warn!(
"Metric {} not implemented for ({},{})",
name, pool.db, pool.user
);
}
}
}
@@ -318,7 +320,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
@@ -364,7 +366,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -79,12 +79,12 @@ impl ScramSha256 {
let server_message = Message::parse(message)?;
if !server_message.nonce.starts_with(&self.nonce) {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
let salt = match general_purpose::STANDARD.decode(&server_message.salt) {
Ok(salt) => salt,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
let salted_password = Self::hi(
@@ -166,9 +166,9 @@ impl ScramSha256 {
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
let final_message = FinalMessage::parse(message)?;
let verifier = match general_purpose::STANDARD.decode(final_message.value) {
let verifier = match general_purpose::STANDARD.decode(&final_message.value) {
Ok(verifier) => verifier,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
let mut hmac = match Hmac::<Sha256>::new_from_slice(&self.salted_password) {
@@ -230,14 +230,14 @@ impl Message {
.collect::<Vec<String>>();
if parts.len() != 3 {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
let nonce = str::replace(&parts[0], "r=", "");
let salt = str::replace(&parts[1], "s=", "");
let iterations = match str::replace(&parts[2], "i=", "").parse::<u32>() {
Ok(iterations) => iterations,
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
};
Ok(Message {
@@ -257,7 +257,7 @@ impl FinalMessage {
/// Parse the server final validation message.
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
if !message.starts_with(b"v=") || message.len() < 4 {
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
return Err(Error::ProtocolSyncError(format!("SCRAM")));
}
Ok(FinalMessage {

File diff suppressed because it is too large Load Diff

View File

@@ -64,7 +64,7 @@ impl Sharder {
fn sha1(&self, key: i64) -> usize {
let mut hasher = Sha1::new();
hasher.update(key.to_string().as_bytes());
hasher.update(&key.to_string().as_bytes());
let result = hasher.finalize();
@@ -202,10 +202,10 @@ mod test {
#[test]
fn test_sha1_hash() {
let sharder = Sharder::new(12, ShardingFunction::Sha1);
let ids = [
let ids = vec![
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let shards = [
let shards = vec![
4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
];

View File

@@ -1,3 +1,4 @@
use crate::pool::PoolIdentifier;
/// Statistics and reporting.
use arc_swap::ArcSwap;
@@ -15,11 +16,13 @@ pub mod pool;
pub mod server;
pub use address::AddressStats;
pub use client::{ClientState, ClientStats};
pub use pool::PoolStats;
pub use server::{ServerState, ServerStats};
/// Convenience types for various stats
type ClientStatesLookup = HashMap<i32, Arc<ClientStats>>;
type ServerStatesLookup = HashMap<i32, Arc<ServerStats>>;
type PoolStatsLookup = HashMap<PoolIdentifier, Arc<PoolStats>>;
/// Stats for individual client connections
/// Used in SHOW CLIENTS.
@@ -31,6 +34,11 @@ static CLIENT_STATS: Lazy<Arc<RwLock<ClientStatesLookup>>> =
static SERVER_STATS: Lazy<Arc<RwLock<ServerStatesLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default())));
/// Aggregate stats for each pool (a pool is identified by database name and username)
/// Used in SHOW POOLS.
static POOL_STATS: Lazy<Arc<RwLock<PoolStatsLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default())));
/// The statistics reporter. An instance is given to each possible source of statistics,
/// e.g. client stats, server stats, connection pool stats.
pub static REPORTER: Lazy<ArcSwap<Reporter>> =
@@ -58,7 +66,7 @@ impl Reporter {
CLIENT_STATS.write().insert(client_id, stats);
}
/// Reports a client is disconnecting from the pooler.
/// Reports a client is disconecting from the pooler.
fn client_disconnecting(&self, client_id: i32) {
CLIENT_STATS.write().remove(&client_id);
}
@@ -68,10 +76,15 @@ impl Reporter {
fn server_register(&self, server_id: i32, stats: Arc<ServerStats>) {
SERVER_STATS.write().insert(server_id, stats);
}
/// Reports a server connection is disconnecting from the pooler.
/// Reports a server connection is disconecting from the pooler.
fn server_disconnecting(&self, server_id: i32) {
SERVER_STATS.write().remove(&server_id);
}
/// Register a pool with the stats system.
fn pool_register(&self, identifier: PoolIdentifier, stats: Arc<PoolStats>) {
POOL_STATS.write().insert(identifier, stats);
}
}
/// The statistics collector which used for calculating averages
@@ -92,20 +105,8 @@ impl Collector {
loop {
interval.tick().await;
// Hold read lock for duration of update to retain all server stats
let server_stats = SERVER_STATS.read();
for stats in server_stats.values() {
if !stats.check_address_stat_average_is_updated_status() {
stats.address_stats().update_averages();
stats.address_stats().reset_current_counts();
stats.set_address_stat_average_is_updated_status(true);
}
}
// Reset to false for next update
for stats in server_stats.values() {
stats.set_address_stat_average_is_updated_status(false);
for stats in SERVER_STATS.read().values() {
stats.address_stats().update_averages();
}
}
});
@@ -124,6 +125,12 @@ pub fn get_server_stats() -> ServerStatesLookup {
SERVER_STATS.read().clone()
}
/// Get a snapshot of pool statistics.
/// by the `Collector`.
pub fn get_pool_stats() -> PoolStatsLookup {
POOL_STATS.read().clone()
}
/// Get the statistics reporter used to update stats across the pools/clients.
pub fn get_reporter() -> Reporter {
(*(*REPORTER.load())).clone()

View File

@@ -1,29 +1,26 @@
use log::warn;
use std::sync::atomic::*;
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
struct AddressStatFields {
xact_count: Arc<AtomicU64>,
query_count: Arc<AtomicU64>,
bytes_received: Arc<AtomicU64>,
bytes_sent: Arc<AtomicU64>,
xact_time: Arc<AtomicU64>,
query_time: Arc<AtomicU64>,
wait_time: Arc<AtomicU64>,
errors: Arc<AtomicU64>,
}
/// Internal address stats
#[derive(Debug, Clone, Default)]
pub struct AddressStats {
total: AddressStatFields,
current: AddressStatFields,
averages: AddressStatFields,
// Determines if the averages have been updated since the last time they were reported
pub averages_updated: Arc<AtomicBool>,
pub total_xact_count: Arc<AtomicU64>,
pub total_query_count: Arc<AtomicU64>,
pub total_received: Arc<AtomicU64>,
pub total_sent: Arc<AtomicU64>,
pub total_xact_time: Arc<AtomicU64>,
pub total_query_time: Arc<AtomicU64>,
pub total_wait_time: Arc<AtomicU64>,
pub total_errors: Arc<AtomicU64>,
pub avg_query_count: Arc<AtomicU64>,
pub avg_query_time: Arc<AtomicU64>,
pub avg_recv: Arc<AtomicU64>,
pub avg_sent: Arc<AtomicU64>,
pub avg_errors: Arc<AtomicU64>,
pub avg_xact_time: Arc<AtomicU64>,
pub avg_xact_count: Arc<AtomicU64>,
pub avg_wait_time: Arc<AtomicU64>,
}
impl IntoIterator for AddressStats {
@@ -34,67 +31,67 @@ impl IntoIterator for AddressStats {
vec![
(
"total_xact_count".to_string(),
self.total.xact_count.load(Ordering::Relaxed),
self.total_xact_count.load(Ordering::Relaxed),
),
(
"total_query_count".to_string(),
self.total.query_count.load(Ordering::Relaxed),
self.total_query_count.load(Ordering::Relaxed),
),
(
"total_received".to_string(),
self.total.bytes_received.load(Ordering::Relaxed),
self.total_received.load(Ordering::Relaxed),
),
(
"total_sent".to_string(),
self.total.bytes_sent.load(Ordering::Relaxed),
self.total_sent.load(Ordering::Relaxed),
),
(
"total_xact_time".to_string(),
self.total.xact_time.load(Ordering::Relaxed),
self.total_xact_time.load(Ordering::Relaxed),
),
(
"total_query_time".to_string(),
self.total.query_time.load(Ordering::Relaxed),
self.total_query_time.load(Ordering::Relaxed),
),
(
"total_wait_time".to_string(),
self.total.wait_time.load(Ordering::Relaxed),
self.total_wait_time.load(Ordering::Relaxed),
),
(
"total_errors".to_string(),
self.total.errors.load(Ordering::Relaxed),
self.total_errors.load(Ordering::Relaxed),
),
(
"avg_xact_count".to_string(),
self.averages.xact_count.load(Ordering::Relaxed),
self.avg_xact_count.load(Ordering::Relaxed),
),
(
"avg_query_count".to_string(),
self.averages.query_count.load(Ordering::Relaxed),
self.avg_query_count.load(Ordering::Relaxed),
),
(
"avg_recv".to_string(),
self.averages.bytes_received.load(Ordering::Relaxed),
self.avg_recv.load(Ordering::Relaxed),
),
(
"avg_sent".to_string(),
self.averages.bytes_sent.load(Ordering::Relaxed),
self.avg_sent.load(Ordering::Relaxed),
),
(
"avg_errors".to_string(),
self.averages.errors.load(Ordering::Relaxed),
self.avg_errors.load(Ordering::Relaxed),
),
(
"avg_xact_time".to_string(),
self.averages.xact_time.load(Ordering::Relaxed),
self.avg_xact_time.load(Ordering::Relaxed),
),
(
"avg_query_time".to_string(),
self.averages.query_time.load(Ordering::Relaxed),
self.avg_query_time.load(Ordering::Relaxed),
),
(
"avg_wait_time".to_string(),
self.averages.wait_time.load(Ordering::Relaxed),
self.avg_wait_time.load(Ordering::Relaxed),
),
]
.into_iter()
@@ -102,120 +99,22 @@ impl IntoIterator for AddressStats {
}
impl AddressStats {
pub fn xact_count_add(&self) {
self.total.xact_count.fetch_add(1, Ordering::Relaxed);
self.current.xact_count.fetch_add(1, Ordering::Relaxed);
}
pub fn query_count_add(&self) {
self.total.query_count.fetch_add(1, Ordering::Relaxed);
self.current.query_count.fetch_add(1, Ordering::Relaxed);
}
pub fn bytes_received_add(&self, bytes: u64) {
self.total
.bytes_received
.fetch_add(bytes, Ordering::Relaxed);
self.current
.bytes_received
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn bytes_sent_add(&self, bytes: u64) {
self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
}
pub fn xact_time_add(&self, time: u64) {
self.total.xact_time.fetch_add(time, Ordering::Relaxed);
self.current.xact_time.fetch_add(time, Ordering::Relaxed);
}
pub fn query_time_add(&self, time: u64) {
self.total.query_time.fetch_add(time, Ordering::Relaxed);
self.current.query_time.fetch_add(time, Ordering::Relaxed);
}
pub fn wait_time_add(&self, time: u64) {
self.total.wait_time.fetch_add(time, Ordering::Relaxed);
self.current.wait_time.fetch_add(time, Ordering::Relaxed);
}
pub fn error(&self) {
self.total.errors.fetch_add(1, Ordering::Relaxed);
self.current.errors.fetch_add(1, Ordering::Relaxed);
self.total_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn update_averages(&self) {
let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000;
// xact_count
let current_xact_count = self.current.xact_count.load(Ordering::Relaxed);
let current_xact_time = self.current.xact_time.load(Ordering::Relaxed);
self.averages.xact_count.store(
current_xact_count / stat_period_per_second,
Ordering::Relaxed,
);
if current_xact_count == 0 {
self.averages.xact_time.store(0, Ordering::Relaxed);
} else {
self.averages
.xact_time
.store(current_xact_time / current_xact_count, Ordering::Relaxed);
let (totals, averages) = self.fields_iterators();
for data in totals.iter().zip(averages.iter()) {
let (total, average) = data;
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
let total = total.load(Ordering::Relaxed);
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
Some(avg)
}) {
warn!("Could not update averages for addresses stats, {:?}", err);
}
}
// query_count
let current_query_count = self.current.query_count.load(Ordering::Relaxed);
let current_query_time = self.current.query_time.load(Ordering::Relaxed);
self.averages.query_count.store(
current_query_count / stat_period_per_second,
Ordering::Relaxed,
);
if current_query_count == 0 {
self.averages.query_time.store(0, Ordering::Relaxed);
} else {
self.averages
.query_time
.store(current_query_time / current_query_count, Ordering::Relaxed);
}
// bytes_received
let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);
self.averages.bytes_received.store(
current_bytes_received / stat_period_per_second,
Ordering::Relaxed,
);
// bytes_sent
let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed);
self.averages.bytes_sent.store(
current_bytes_sent / stat_period_per_second,
Ordering::Relaxed,
);
// wait_time
let current_wait_time = self.current.wait_time.load(Ordering::Relaxed);
self.averages.wait_time.store(
current_wait_time / stat_period_per_second,
Ordering::Relaxed,
);
// errors
let current_errors = self.current.errors.load(Ordering::Relaxed);
self.averages
.errors
.store(current_errors / stat_period_per_second, Ordering::Relaxed);
}
pub fn reset_current_counts(&self) {
self.current.xact_count.store(0, Ordering::Relaxed);
self.current.xact_time.store(0, Ordering::Relaxed);
self.current.query_count.store(0, Ordering::Relaxed);
self.current.query_time.store(0, Ordering::Relaxed);
self.current.bytes_received.store(0, Ordering::Relaxed);
self.current.bytes_sent.store(0, Ordering::Relaxed);
self.current.wait_time.store(0, Ordering::Relaxed);
self.current.errors.store(0, Ordering::Relaxed);
}
pub fn populate_row(&self, row: &mut Vec<String>) {
@@ -223,4 +122,28 @@ impl AddressStats {
row.push(value.to_string());
}
}
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
totals.push(self.total_xact_count.clone());
averages.push(self.avg_xact_count.clone());
totals.push(self.total_query_count.clone());
averages.push(self.avg_query_count.clone());
totals.push(self.total_received.clone());
averages.push(self.avg_recv.clone());
totals.push(self.total_sent.clone());
averages.push(self.avg_sent.clone());
totals.push(self.total_xact_time.clone());
averages.push(self.avg_xact_time.clone());
totals.push(self.total_query_time.clone());
averages.push(self.avg_query_time.clone());
totals.push(self.total_wait_time.clone());
averages.push(self.avg_wait_time.clone());
totals.push(self.total_errors.clone());
averages.push(self.avg_errors.clone());
(totals, averages)
}
}

View File

@@ -1,3 +1,4 @@
use super::PoolStats;
use super::{get_reporter, Reporter};
use atomic_enum::atomic_enum;
use std::sync::atomic::*;
@@ -33,14 +34,12 @@ pub struct ClientStats {
pool_name: String,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds
pub max_wait_time: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
@@ -62,8 +61,8 @@ impl Default for ClientStats {
application_name: String::new(),
username: String::new(),
pool_name: String::new(),
pool_stats: Arc::new(PoolStats::default()),
total_wait_time: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
@@ -80,9 +79,11 @@ impl ClientStats {
username: &str,
pool_name: &str,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
) -> Self {
Self {
client_id,
pool_stats,
connect_time,
application_name: application_name.to_string(),
username: username.to_string(),
@@ -91,10 +92,12 @@ impl ClientStats {
}
}
/// Reports a client is disconnecting from the pooler and
/// Reports a client is disconecting from the pooler and
/// update metrics on the corresponding pool.
pub fn disconnect(&self) {
self.reporter.client_disconnecting(self.client_id);
self.pool_stats
.client_disconnect(self.state.load(Ordering::Relaxed))
}
/// Register a client with the stats system. The stats system uses client_id
@@ -102,20 +105,27 @@ impl ClientStats {
pub fn register(&self, stats: Arc<ClientStats>) {
self.reporter.client_register(self.client_id, stats);
self.state.store(ClientState::Idle, Ordering::Relaxed);
self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed);
}
/// Reports a client is done querying the server and is no longer assigned a server connection
pub fn idle(&self) {
self.pool_stats
.client_idle(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Idle, Ordering::Relaxed);
}
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
self.pool_stats
.client_waiting(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
/// Reports a client is done waiting for a connection and is about to query the server.
pub fn active(&self) {
self.pool_stats
.client_active(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Active, Ordering::Relaxed);
}
@@ -130,12 +140,10 @@ impl ClientStats {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
/// Reporters the time spent by a client waiting to get a healthy connection from the pool
/// Reportes the time spent by a client waiting to get a healthy connection from the pool
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.max_wait_time
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server

View File

@@ -1,131 +1,36 @@
use log::debug;
use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap;
use crate::config::Pool;
use crate::config::PoolMode;
use crate::pool::PoolIdentifier;
use std::sync::atomic::*;
use std::sync::Arc;
use crate::pool::get_all_pools;
use super::get_reporter;
use super::Reporter;
use super::{ClientState, ServerState};
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
/// A struct that holds information about a Pool .
pub struct PoolStats {
pub identifier: PoolIdentifier,
pub mode: PoolMode,
pub cl_idle: u64,
pub cl_active: u64,
pub cl_waiting: u64,
pub cl_cancel_req: u64,
pub sv_active: u64,
pub sv_idle: u64,
pub sv_used: u64,
pub sv_tested: u64,
pub sv_login: u64,
pub maxwait: u64,
}
impl PoolStats {
pub fn new(identifier: PoolIdentifier, mode: PoolMode) -> Self {
PoolStats {
identifier,
mode,
cl_idle: 0,
cl_active: 0,
cl_waiting: 0,
cl_cancel_req: 0,
sv_active: 0,
sv_idle: 0,
sv_used: 0,
sv_tested: 0,
sv_login: 0,
maxwait: 0,
}
}
// Pool identifier, cannot be changed after creating the instance
identifier: PoolIdentifier,
pub fn construct_pool_lookup() -> HashMap<PoolIdentifier, PoolStats> {
let mut map: HashMap<PoolIdentifier, PoolStats> = HashMap::new();
let client_map = super::get_client_stats();
let server_map = super::get_server_stats();
// Pool Config, cannot be changed after creating the instance
config: Pool,
for (identifier, pool) in get_all_pools() {
map.insert(
identifier.clone(),
PoolStats::new(identifier, pool.settings.pool_mode),
);
}
// A reference to the global reporter.
reporter: Reporter,
for client in client_map.values() {
match map.get_mut(&PoolIdentifier {
db: client.pool_name(),
user: client.username(),
}) {
Some(pool_stats) => {
match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => pool_stats.cl_waiting += 1,
}
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
}
None => debug!("Client from an obselete pool"),
}
}
for server in server_map.values() {
match map.get_mut(&PoolIdentifier {
db: server.pool_name(),
user: server.username(),
}) {
Some(pool_stats) => match server.state.load(Ordering::Relaxed) {
ServerState::Active => pool_stats.sv_active += 1,
ServerState::Idle => pool_stats.sv_idle += 1,
ServerState::Login => pool_stats.sv_login += 1,
ServerState::Tested => pool_stats.sv_tested += 1,
},
None => debug!("Server from an obselete pool"),
}
}
map
}
pub fn generate_header() -> Vec<(&'static str, DataType)> {
vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
]
}
pub fn generate_row(&self) -> Vec<String> {
vec![
self.identifier.db.clone(),
self.identifier.user.clone(),
self.mode.to_string(),
self.cl_idle.to_string(),
self.cl_active.to_string(),
self.cl_waiting.to_string(),
self.cl_cancel_req.to_string(),
self.sv_active.to_string(),
self.sv_idle.to_string(),
self.sv_used.to_string(),
self.sv_tested.to_string(),
self.sv_login.to_string(),
(self.maxwait / 1_000_000).to_string(),
(self.maxwait % 1_000_000).to_string(),
]
}
/// Counters (atomics)
pub cl_idle: Arc<AtomicU64>,
pub cl_active: Arc<AtomicU64>,
pub cl_waiting: Arc<AtomicU64>,
pub cl_cancel_req: Arc<AtomicU64>,
pub sv_active: Arc<AtomicU64>,
pub sv_idle: Arc<AtomicU64>,
pub sv_used: Arc<AtomicU64>,
pub sv_tested: Arc<AtomicU64>,
pub sv_login: Arc<AtomicU64>,
pub maxwait: Arc<AtomicU64>,
}
impl IntoIterator for PoolStats {
@@ -134,18 +39,243 @@ impl IntoIterator for PoolStats {
fn into_iter(self) -> Self::IntoIter {
vec![
("cl_idle".to_string(), self.cl_idle),
("cl_active".to_string(), self.cl_active),
("cl_waiting".to_string(), self.cl_waiting),
("cl_cancel_req".to_string(), self.cl_cancel_req),
("sv_active".to_string(), self.sv_active),
("sv_idle".to_string(), self.sv_idle),
("sv_used".to_string(), self.sv_used),
("sv_tested".to_string(), self.sv_tested),
("sv_login".to_string(), self.sv_login),
("maxwait".to_string(), self.maxwait / 1_000_000),
("maxwait_us".to_string(), self.maxwait % 1_000_000),
("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)),
(
"cl_active".to_string(),
self.cl_active.load(Ordering::Relaxed),
),
(
"cl_waiting".to_string(),
self.cl_waiting.load(Ordering::Relaxed),
),
(
"cl_cancel_req".to_string(),
self.cl_cancel_req.load(Ordering::Relaxed),
),
(
"sv_active".to_string(),
self.sv_active.load(Ordering::Relaxed),
),
("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)),
("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)),
(
"sv_tested".to_string(),
self.sv_tested.load(Ordering::Relaxed),
),
(
"sv_login".to_string(),
self.sv_login.load(Ordering::Relaxed),
),
(
"maxwait".to_string(),
self.maxwait.load(Ordering::Relaxed) / 1_000_000,
),
(
"maxwait_us".to_string(),
self.maxwait.load(Ordering::Relaxed) % 1_000_000,
),
]
.into_iter()
}
}
impl PoolStats {
pub fn new(identifier: PoolIdentifier, config: Pool) -> Self {
Self {
identifier,
config,
reporter: get_reporter(),
..Default::default()
}
}
// Getters
pub fn register(&self, stats: Arc<PoolStats>) {
self.reporter.pool_register(self.identifier.clone(), stats);
}
pub fn database(&self) -> String {
self.identifier.db.clone()
}
pub fn user(&self) -> String {
self.identifier.user.clone()
}
pub fn redacted_secret(&self) -> String {
match self.identifier.secret {
Some(ref s) => format!("****{}", &s[s.len() - 4..]),
None => "<no secret>".to_string(),
}
}
pub fn pool_mode(&self) -> PoolMode {
self.config.pool_mode
}
/// Populates an array of strings with counters (used by admin in show pools)
pub fn populate_row(&self, row: &mut Vec<String>) {
for (_key, value) in self.clone() {
row.push(value.to_string());
}
}
/// Deletes the maxwait counter, this is done everytime we obtain metrics
pub fn clear_maxwait(&self) {
self.maxwait.store(0, Ordering::Relaxed);
}
/// Notified when a server of the pool enters login state.
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_login(&self, from: ServerState) {
self.sv_login.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Login {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_active(&self, from: ServerState) {
self.sv_active.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Active {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'tested'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_tested(&self, from: ServerState) {
self.sv_tested.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Tested {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_idle(&self, from: ServerState) {
self.sv_idle.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Idle {
self.decrease_from_server_state(from);
}
}
/// Notified when a client of the pool become 'waiting'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_waiting(&self, from: ClientState) {
if from != ClientState::Waiting {
self.cl_waiting.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_active(&self, from: ClientState) {
if from != ClientState::Active {
self.cl_active.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_idle(&self, from: ClientState) {
if from != ClientState::Idle {
self.cl_idle.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_disconnect(&self, from: ClientState) {
let counter = match from {
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
ClientState::Active => &self.cl_active,
};
Self::decrease_counter(counter.clone());
}
/// Notified when a server disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn server_disconnect(&self, from: ServerState) {
let counter = match from {
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
ServerState::Tested => &self.sv_tested,
};
Self::decrease_counter(counter.clone());
}
// helpers for counter decrease
fn decrease_from_server_state(&self, from: ServerState) {
let counter = match from {
ServerState::Tested => &self.sv_tested,
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
};
Self::decrease_counter(counter.clone());
}
fn decrease_from_client_state(&self, from: ClientState) {
let counter = match from {
ClientState::Active => &self.cl_active,
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
};
Self::decrease_counter(counter.clone());
}
fn decrease_counter(value: Arc<AtomicU64>) {
if value.load(Ordering::Relaxed) > 0 {
value.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_decrease() {
let stat: PoolStats = PoolStats::default();
stat.server_login(ServerState::Login);
stat.server_idle(ServerState::Login);
assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0);
assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1);
}
}

View File

@@ -1,4 +1,5 @@
use super::AddressStats;
use super::PoolStats;
use super::{get_reporter, Reporter};
use crate::config::Address;
use atomic_enum::atomic_enum;
@@ -37,6 +38,7 @@ pub struct ServerStats {
address: Address,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Data
@@ -47,10 +49,6 @@ pub struct ServerStats {
pub transaction_count: Arc<AtomicU64>,
pub query_count: Arc<AtomicU64>,
pub error_count: Arc<AtomicU64>,
pub prepared_hit_count: Arc<AtomicU64>,
pub prepared_miss_count: Arc<AtomicU64>,
pub prepared_eviction_count: Arc<AtomicU64>,
pub prepared_cache_size: Arc<AtomicU64>,
}
impl Default for ServerStats {
@@ -59,6 +57,7 @@ impl Default for ServerStats {
server_id: 0,
application_name: Arc::new(RwLock::new(String::new())),
address: Address::default(),
pool_stats: Arc::new(PoolStats::default()),
connect_time: Instant::now(),
state: Arc::new(AtomicServerState::new(ServerState::Login)),
bytes_sent: Arc::new(AtomicU64::new(0)),
@@ -67,18 +66,15 @@ impl Default for ServerStats {
query_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
reporter: get_reporter(),
prepared_hit_count: Arc::new(AtomicU64::new(0)),
prepared_miss_count: Arc::new(AtomicU64::new(0)),
prepared_eviction_count: Arc::new(AtomicU64::new(0)),
prepared_cache_size: Arc::new(AtomicU64::new(0)),
}
}
}
impl ServerStats {
pub fn new(address: Address, connect_time: Instant) -> Self {
pub fn new(address: Address, pool_stats: Arc<PoolStats>, connect_time: Instant) -> Self {
Self {
address,
pool_stats,
connect_time,
server_id: rand::random::<i32>(),
..Default::default()
@@ -100,23 +96,33 @@ impl ServerStats {
/// Reports a server connection is no longer assigned to a client
/// and is available for the next client to pick it up
pub fn idle(&self) {
self.pool_stats
.server_idle(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Idle, Ordering::Relaxed);
self.set_undefined_application();
}
/// Reports a server connection is disconnecting from the pooler.
/// Reports a server connection is disconecting from the pooler.
/// Also updates metrics on the pool regarding server usage.
pub fn disconnect(&self) {
self.reporter.server_disconnecting(self.server_id);
self.pool_stats
.server_disconnect(self.state.load(Ordering::Relaxed))
}
/// Reports a server connection is being tested before being given to a client.
pub fn tested(&self) {
self.set_undefined_application();
self.pool_stats
.server_tested(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Tested, Ordering::Relaxed);
}
/// Reports a server connection is attempting to login.
pub fn login(&self) {
self.pool_stats
.server_login(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Login, Ordering::Relaxed);
self.set_undefined_application();
}
@@ -124,6 +130,8 @@ impl ServerStats {
/// Reports a server connection has been assigned to a client that
/// is about to query the server
pub fn active(&self, application_name: String) {
self.pool_stats
.server_active(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Active, Ordering::Relaxed);
self.set_application(application_name);
}
@@ -132,24 +140,13 @@ impl ServerStats {
self.address.stats.clone()
}
pub fn check_address_stat_average_is_updated_status(&self) -> bool {
self.address.stats.averages_updated.load(Ordering::Relaxed)
}
pub fn set_address_stat_average_is_updated_status(&self, is_checked: bool) {
self.address
.stats
.averages_updated
.store(is_checked, Ordering::Relaxed);
}
// Helper methods for show_servers
pub fn pool_name(&self) -> String {
self.address.pool_name.clone()
self.pool_stats.database()
}
pub fn username(&self) -> String {
self.address.username.clone()
self.pool_stats.user()
}
pub fn address_name(&self) -> String {
@@ -170,17 +167,27 @@ impl ServerStats {
}
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
// Update server stats and address aggregation stats
// Update server stats and address aggergation stats
self.set_application(application_name);
self.address.stats.wait_time_add(microseconds);
self.address
.stats
.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.pool_stats
.maxwait
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server
pub fn query(&self, milliseconds: u64, application_name: &str) {
self.set_application(application_name.to_string());
self.address.stats.query_count_add();
self.address.stats.query_time_add(milliseconds);
self.query_count.fetch_add(1, Ordering::Relaxed);
let address_stats = self.address_stats();
address_stats
.total_query_count
.fetch_add(1, Ordering::Relaxed);
address_stats
.total_query_time
.fetch_add(milliseconds, Ordering::Relaxed);
}
/// Report a transaction executed by a client a server
@@ -191,39 +198,29 @@ impl ServerStats {
self.set_application(application_name.to_string());
self.transaction_count.fetch_add(1, Ordering::Relaxed);
self.address.stats.xact_count_add();
self.address
.stats
.total_xact_count
.fetch_add(1, Ordering::Relaxed);
}
/// Report data sent to a server
pub fn data_sent(&self, amount_bytes: usize) {
self.bytes_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address.stats.bytes_sent_add(amount_bytes as u64);
self.address
.stats
.total_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
}
/// Report data received from a server
pub fn data_received(&self, amount_bytes: usize) {
self.bytes_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address.stats.bytes_received_add(amount_bytes as u64);
}
/// Report a prepared statement that already exists on the server.
pub fn prepared_cache_hit(&self) {
self.prepared_hit_count.fetch_add(1, Ordering::Relaxed);
}
/// Report a prepared statement that does not exist on the server yet.
pub fn prepared_cache_miss(&self) {
self.prepared_miss_count.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_add(&self) {
self.prepared_cache_size.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_remove(&self) {
self.prepared_eviction_count.fetch_add(1, Ordering::Relaxed);
self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed);
self.address
.stats
.total_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
}
}

View File

@@ -4,12 +4,7 @@ use rustls_pemfile::{certs, read_one, Item};
use std::iter;
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
use tokio_rustls::rustls::{
self,
client::{ServerCertVerified, ServerCertVerifier},
Certificate, PrivateKey, ServerName,
};
use tokio_rustls::rustls::{self, Certificate, PrivateKey};
use tokio_rustls::TlsAcceptor;
use crate::config::get_config;
@@ -69,19 +64,3 @@ impl Tls {
})
}
}
pub struct NoCertificateVerification;
impl ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_server_name: &ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
}

View File

@@ -1,13 +1,8 @@
FROM rust:bullseye
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
RUN cargo install cargo-binutils rustfilt
RUN rustup component add llvm-tools-preview
RUN sudo gem install bundler
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i toxiproxy-2.4.0.deb
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -1,5 +0,0 @@
module pgcat
go 1.21
require github.com/lib/pq v1.10.9

View File

@@ -1,2 +0,0 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

View File

@@ -1,162 +0,0 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = "${PORT}"
# Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 1000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# If we should log client connections
log_client_connections = false
# If we should log client disconnections
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = 15000
server_round_robin = false
# TLS
tls_certificate = "../../.circleci/server.cert"
tls_private_key = "../../.circleci/server.key"
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username = "admin_user"
admin_password = "admin_pass"
# pool
# configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
[pools.sharded_db]
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"
# Prepared statements cache size.
prepared_statements_cache_size = 500
# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
# Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 5
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000
# Shard 0
[pools.sharded_db.shards.0]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
# Database name (e.g. "postgres")
database = "shard0"
[pools.sharded_db.shards.1]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard1"
[pools.sharded_db.shards.2]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard2"
[pools.simple_db]
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
[pools.simple_db.users.0]
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
database = "some_db"

View File

@@ -1,52 +0,0 @@
package pgcat
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq"
"testing"
)
func Test(t *testing.T) {
t.Cleanup(setup(t))
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
}
func namedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
stmt, err := db.Prepare("SELECT $1")
if err != nil {
t.Fatalf("could not prepare: %+v", err)
}
for i := 0; i < 100; i++ {
rows, err := stmt.Query(1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}
func unnamedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
for i := 0; i < 100; i++ {
// Under the hood QueryContext generates an unnamed parameterized prepared statement
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}

View File

@@ -1,81 +0,0 @@
package pgcat
import (
"context"
"database/sql"
_ "embed"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
"testing"
"time"
)
//go:embed pgcat.toml
var pgcatCfg string
var port = rand.Intn(32760-20000) + 20000
func setup(t *testing.T) func() {
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
if err != nil {
t.Fatalf("could not create temp file: %+v", err)
}
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
_, err = cfg.Write([]byte(pgcatCfg))
if err != nil {
t.Fatalf("could not write temp file: %+v", err)
}
commandPath := "../../target/debug/pgcat"
if os.Getenv("CARGO_TARGET_DIR") != "" {
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
}
cmd := exec.Command(commandPath, cfg.Name())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err = cmd.Run()
if err != nil {
t.Errorf("could not run pgcat: %+v", err)
}
}()
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancelFunc()
for {
select {
case <-deadline.Done():
break
case <-time.After(50 * time.Millisecond):
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
if err != nil {
continue
}
rows, err := db.QueryContext(deadline, "SHOW STATS")
if err != nil {
continue
}
_ = rows.Close()
_ = db.Close()
break
}
break
}
return func() {
err := cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatalf("could not interrupt pgcat: %+v", err)
}
err = os.Remove(cfg.Name())
if err != nil {
t.Fatalf("could not remove temp file: %+v", err)
}
}
}

View File

@@ -36,4 +36,4 @@ SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
SET SERVER ROLE TO 'replica';
-- Read load balancing
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;

View File

@@ -63,7 +63,6 @@ def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()

View File

@@ -11,6 +11,323 @@ describe "Admin" do
processes.pgcat.shutdown
end
describe "SHOW STATS" do
context "clients connect and make one query" do
it "updates *_query_time and *_wait_time" do
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.async_exec("SELECT pg_sleep(0.25)")
connection.close
# wait for averages to be calculated, we shouldn't do this too often
sleep(15.5)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to_not eq(0)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
end
end
end
describe "SHOW POOLS" do
context "bad credentials" do
it "does not change any stats" do
bad_passsword_url = URI(pgcat_conn_str)
bad_passsword_url.password = "wrong"
expect { PG::connect("#{bad_passsword_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "bad database name" do
it "does not change any stats" do
bad_db_url = URI(pgcat_conn_str)
bad_db_url.path = "/wrong_db"
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "client connects but issues no queries" do
it "only affects cl_idle stats" do
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("20")
expect(results["sv_idle"]).to eq("1")
connections.map(&:close)
sleep(1.1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "clients connect and make one query" do
it "only affects cl_idle, sv_idle stats" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_active"]).to eq("5")
expect(results["sv_active"]).to eq("5")
sleep(3)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("5")
connections.map(&:close)
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client connects and opens a transaction and closes connection uncleanly" do
it "produces correct statistics" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new do
c.async_exec("BEGIN")
c.async_exec("SELECT pg_sleep(0.01)")
c.close
end
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client fail to checkout connection from the pool" do
it "counts clients as idle" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
threads = []
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
end
sleep(2)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("1")
threads.map(&:join)
connections.map(&:close)
end
end
context "clients connects and disconnect normally" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it 'shows the same number of clients before and after' do
clients_before = clients_connected_to_pool(processes: processes)
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_between = clients_connected_to_pool(processes: processes)
expect(clients_before).not_to eq(clients_between)
connections.each(&:close)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients connects and disconnect abruptly" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
it 'shows the same number of clients before and after' do
threads = []
connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_before = clients_connected_to_pool(processes: processes)
random_string = (0...8).map { (65 + rand(26)).chr }.join
connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
sleep(1)
# psql starts two processes, we only know the pid of the parent, this
# ensure both are killed
`pkill -9 -f '#{random_string}'`
Process.wait(faulty_client)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients overwhelm server pools" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "cl_waiting is updated to show it" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_waiting"]).to eq("2")
expect(results["cl_active"]).to eq("2")
expect(results["sv_active"]).to eq("2")
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("4")
expect(results["sv_idle"]).to eq("2")
threads.map(&:join)
connections.map(&:close)
end
it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
connections.map(&:close)
end
end
end
describe "SHOW CLIENTS" do
it "reports correct number and application names" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(21) # count admin clients
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
connections[0..5].map(&:close)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(15)
connections[6..].map(&:close)
sleep(1) # Wait for stats to be updated
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
admin_conn.close
end
it "reports correct number of queries and transactions" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
connections.each do |c|
c.async_exec("SELECT 1")
c.async_exec("SELECT 2")
c.async_exec("SELECT 3")
c.async_exec("BEGIN")
c.async_exec("SELECT 4")
c.async_exec("SELECT 5")
c.async_exec("COMMIT")
end
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(3)
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
expect(normal_client_results[0]["transaction_count"]).to eq("4")
expect(normal_client_results[1]["transaction_count"]).to eq("4")
expect(normal_client_results[0]["query_count"]).to eq("7")
expect(normal_client_results[1]["query_count"]).to eq("7")
admin_conn.close
connections.map(&:close)
end
end
describe "Manual Banning" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 10) }
before do
@@ -81,7 +398,7 @@ describe "Admin" do
end
end
describe "SHOW USERS" do
describe "SHOW users" do
it "returns the right users" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW USERS")[0]
@@ -90,28 +407,4 @@ describe "Admin" do
expect(results["pool_mode"]).to eq("transaction")
end
end
describe "PAUSE" do
it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
admin_conn.async_exec("PAUSE")
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["1"])
admin_conn.async_exec("RESUME")
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
end
it "handles errors" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec("PAUSE foo").to_a }.to raise_error(PG::SystemError)
expect { admin_conn.async_exec("PAUSE foo,bar").to_a }.to raise_error(PG::SystemError)
end
end
end

View File

@@ -67,7 +67,7 @@ describe "Auth Query" do
end
context 'and with cleartext passwords not set' do
let(:config_user) { { 'username' => 'sharding_user', 'password' => 'sharding_user' } }
let(:config_user) { { 'username' => 'sharding_user' } }
it 'it uses obtained passwords' do
connection_string = processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password'])
@@ -76,7 +76,7 @@ describe "Auth Query" do
end
it 'allows passwords to be changed without closing existing connections' do
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username']))
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password']))
expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil
Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';")
expect(pgconn.exec("SELECT 1 + 4")).not_to be_nil
@@ -84,7 +84,7 @@ describe "Auth Query" do
end
it 'allows passwords to be changed and that new password is needed when reconnecting' do
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username']))
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password']))
expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil
Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';")
newconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], 'secret2'))
@@ -185,7 +185,7 @@ describe "Auth Query" do
},
}
}
}
}
context 'and with cleartext passwords set' do
it 'it uses local passwords' do

39
tests/ruby/auth_spec.rb Normal file
View File

@@ -0,0 +1,39 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "Authentication" do
describe "multiple secrets configured" do
let(:secrets) { ["one_secret", "two_secret"] }
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5, pool_mode="transaction", lb_mode="random", log_level="info", secrets=["one_secret", "two_secret"]) }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
it "can connect using all secrets and postgres password" do
secrets.push("sharding_user").each do |secret|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user", password=secret))
conn.exec("SELECT current_user")
end
end
end
describe "no secrets configured" do
let(:secrets) { [] }
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5, pool_mode="transaction", lb_mode="random", log_level="info") }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
it "can connect using only the password" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.exec("SELECT current_user")
expect { PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user", password="secret_one")) }.to raise_error PG::ConnectionBad
end
end
end

View File

@@ -1,102 +0,0 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "COPY Handling" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 5) }
before do
new_configs = processes.pgcat.current_config
# Allow connections in the pool to expire faster
new_configs["general"]["idle_timeout"] = 5
processes.pgcat.update_config(new_configs)
# We need to kill the old process that was using the default configs
processes.pgcat.stop
processes.pgcat.start
processes.pgcat.wait_until_ready
end
before do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "CREATE TABLE copy_test_table (a TEXT,b TEXT,c TEXT,d TEXT)"
end
end
after do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "DROP TABLE copy_test_table;"
end
end
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "COPY FROM" do
context "within transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
context "outside transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
end
describe "COPY TO" do
before do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
conn.close
end
it "works" do
res = []
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.copy_data "COPY copy_test_table TO STDOUT CSV" do
while row=conn.get_copy_data
res << row
end
end
expect(res).to eq(["some,data,to,copy\n", "more,data,to,copy\n"])
end
end
end

View File

@@ -33,18 +33,18 @@ module Helpers
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica.port.to_i, "replica"],
["localhost", primary.port.to_s, "primary"],
["localhost", replica.port.to_s, "replica"],
]
},
},
"users" => { "0" => user.merge(config_user) }
}
}
pgcat_cfg["general"]["port"] = pgcat.port.to_i
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg)
pgcat.start
pgcat.wait_until_ready(
pgcat.connection_string(
"sharded_db",
@@ -92,13 +92,13 @@ module Helpers
"0" => {
"database" => database,
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica.port.to_i, "replica"],
["localhost", primary.port.to_s, "primary"],
["localhost", replica.port.to_s, "replica"],
]
},
},
"users" => { "0" => user.merge(config_user) }
}
}
end
# Main proxy configs
pgcat_cfg["pools"] = {
@@ -109,7 +109,7 @@ module Helpers
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg.deep_merge(extra_conf))
pgcat.start
pgcat.wait_until_ready(pgcat.connection_string("sharded_db0", pg_user['username'], pg_user['password']))
OpenStruct.new.tap do |struct|

View File

@@ -7,24 +7,10 @@ class PgInstance
attr_reader :password
attr_reader :database_name
def self.mass_takedown(databases)
raise StandardError "block missing" unless block_given?
databases.each do |database|
database.toxiproxy.toxic(:limit_data, bytes: 1).toxics.each(&:save)
end
sleep 0.1
yield
ensure
databases.each do |database|
database.toxiproxy.toxics.each(&:destroy)
end
end
def initialize(port, username, password, database_name)
@original_port = port.to_i
@original_port = port
@toxiproxy_port = 10000 + port.to_i
@port = @toxiproxy_port.to_i
@port = @toxiproxy_port
@username = username
@password = password
@@ -62,9 +48,9 @@ class PgInstance
def take_down
if block_given?
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).apply { yield }
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
else
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 1).toxics.each(&:save)
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
end
end
@@ -103,6 +89,6 @@ class PgInstance
end
def count_select_1_plus_2
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query LIKE '%SELECT $1 + $2%'")[0]["sum"].to_i }
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
end
end

View File

@@ -1,259 +0,0 @@
require 'socket'
require 'digest/md5'
BACKEND_MESSAGE_CODES = {
'Z' => "ReadyForQuery",
'C' => "CommandComplete",
'T' => "RowDescription",
'D' => "DataRow",
'1' => "ParseComplete",
'2' => "BindComplete",
'E' => "ErrorResponse",
's' => "PortalSuspended",
}
class PostgresSocket
def initialize(host, port)
@port = port
@host = host
@socket = TCPSocket.new @host, @port
@parameters = {}
@verbose = true
end
def send_md5_password_message(username, password, salt)
m = Digest::MD5.hexdigest(password + username)
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
m = 'md5' + m
bytes = (m.split("").map(&:ord) + [0]).flatten
message_size = bytes.count + 4
message = []
message << 'p'.ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << bytes
message.flatten!
@socket.write(message.pack('C*'))
end
def send_startup_message(username, database, password)
message = []
message << [196608].pack('l>').unpack('CCCC') # 4
message << "user".split('').map(&:ord) # 4, 8
message << 0 # 1, 9
message << username.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message << "database".split('').map(&:ord) # 8, 20
message << 0 # 1, 21
message << database.split('').map(&:ord) # 2, 23
message << 0 # 1, 24
message << 0 # 1, 25
message.flatten!
total_message_size = message.size + 4
message_len = [total_message_size].pack('l>').unpack('CCCC')
@socket.write([message_len + message].flatten.pack('C*'))
sleep 0.1
read_startup_response(username, password)
end
def read_startup_response(username, password)
message_code, message_len = @socket.recv(5).unpack("al>")
while message_code == 'R'
auth_code = @socket.recv(4).unpack('l>').pop
case auth_code
when 5 # md5
salt = @socket.recv(4).unpack('CCCC')
send_md5_password_message(username, password, salt)
message_code, message_len = @socket.recv(5).unpack("al>")
when 0 # trust
break
end
end
loop do
message_code, message_len = @socket.recv(5).unpack("al>")
if message_code == 'Z'
@socket.recv(1).unpack("a") # most likely I
break # We are good to go
end
if message_code == 'S'
actual_message = @socket.recv(message_len - 4).unpack("C*")
k,v = actual_message.pack('U*').split(/\x00/)
@parameters[k] = v
end
if message_code == 'K'
process_id, secret_key = @socket.recv(message_len - 4).unpack("l>l>")
@parameters["process_id"] = process_id
@parameters["secret_key"] = secret_key
end
end
return @parameters
end
def cancel_query
socket = TCPSocket.new @host, @port
process_key = @parameters["process_id"]
secret_key = @parameters["secret_key"]
message = []
message << [16].pack('l>').unpack('CCCC') # 4
message << [80877102].pack('l>').unpack('CCCC') # 4
message << [process_key.to_i].pack('l>').unpack('CCCC') # 4
message << [secret_key.to_i].pack('l>').unpack('CCCC') # 4
message.flatten!
socket.write(message.flatten.pack('C*'))
socket.close
log "[F] Sent CancelRequest message"
end
def send_query_message(query)
query_size = query.length
message_size = 1 + 4 + query_size
message = []
message << "Q".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent Q message (#{query})"
end
def send_parse_message(query)
query_size = query.length
message_size = 2 + 2 + 4 + query_size
message = []
message << "P".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message << [0, 0]
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent P message (#{query})"
end
def send_bind_message
message = []
message << "B".ord
message << [12].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << 0 # unnamed statement
message << [0, 0] # 2
message << [0, 0] # 2
message << [0, 0] # 2
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent B message"
end
def send_describe_message(mode)
message = []
message << "D".ord
message << [6].pack('l>').unpack('CCCC') # 4
message << mode.ord
message << 0 # unnamed statement
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent D message"
end
def send_execute_message(limit=0)
message = []
message << "E".ord
message << [9].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << [limit].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent E message"
end
def send_sync_message
message = []
message << "S".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent S message"
end
def send_copydone_message
message = []
message << "c".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent c message"
end
def send_copyfail_message
message = []
message << "f".ord
message << [5].pack('l>').unpack('CCCC') # 4
message << 0
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent f message"
end
def send_flush_message
message = []
message << "H".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent H message"
end
def read_from_server()
output_messages = []
retry_count = 0
message_code = nil
message_len = 0
loop do
begin
message_code, message_len = @socket.recv_nonblock(5).unpack("al>")
rescue IO::WaitReadable
return output_messages if retry_count > 50
retry_count += 1
sleep(0.01)
next
end
message = {
code: message_code,
len: message_len,
bytes: []
}
log "[B] #{BACKEND_MESSAGE_CODES[message_code] || ('UnknownMessage(' + message_code + ')')}"
actual_message_length = message_len - 4
if actual_message_length > 0
message[:bytes] = @socket.recv(message_len - 4).unpack("C*")
log "\t#{message[:bytes].join(",")}"
log "\t#{message[:bytes].map(&:chr).join(" ")}"
end
output_messages << message
return output_messages if message_code == 'Z'
end
end
def log(msg)
return unless @verbose
puts msg
end
def close
@socket.close
end
end

View File

@@ -2,7 +2,6 @@ require 'json'
require 'ostruct'
require_relative 'pgcat_process'
require_relative 'pg_instance'
require_relative 'pg_socket'
class ::Hash
def deep_merge(second)
@@ -13,14 +12,18 @@ end
module Helpers
module Pgcat
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", secrets=nil)
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
"statement_timeout" => 0,
"username" => "sharding_user"
"username" => "sharding_user",
}
if !secrets.nil?
user["secrets"] = secrets
end
pgcat = PgcatProcess.new(log_level)
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
@@ -28,40 +31,28 @@ module Helpers
pgcat_cfg = pgcat.current_config
pgcat_cfg["pools"] = {
"#{pool_name}" => {
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => true,
"query_parser_enabled" => true,
"query_parser_read_write_splitting" => true,
"automatic_sharding_key" => "data.id",
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_i, "primary"]] },
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_i, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_i, "primary"]] },
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
},
"users" => { "0" => user },
"plugins" => {
"intercept" => {
"enabled" => true,
"queries" => {
"0" => {
"query" => "select current_database() as a, current_schemas(false) as b",
"schema" => [
["a", "text"],
["b", "text"],
],
"result" => [
["${DATABASE}", "{public}"],
]
}
}
}
}
}
"users" => { "0" => user }
},
}
if !secrets.nil?
pgcat_cfg["general"]["tls_certificate"] = "../../.circleci/server.cert"
pgcat_cfg["general"]["tls_private_key"] = "../../.circleci/server.key"
end
pgcat.update_config(pgcat_cfg)
pgcat.start
@@ -100,7 +91,7 @@ module Helpers
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"]
["localhost", primary.port.to_s, "primary"]
]
},
},
@@ -119,7 +110,7 @@ module Helpers
end
end
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", pool_settings={})
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
@@ -135,32 +126,28 @@ module Helpers
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
pool_config = {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_i, "primary"],
["localhost", replica0.port.to_i, "replica"],
["localhost", replica1.port.to_i, "replica"],
["localhost", replica2.port.to_i, "replica"]
]
},
},
"users" => { "0" => user }
}
pool_config = pool_config.merge(pool_settings)
# Main proxy configs
pgcat_cfg["pools"] = {
"#{pool_name}" => pool_config,
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_s, "primary"],
["localhost", replica0.port.to_s, "replica"],
["localhost", replica1.port.to_s, "replica"],
["localhost", replica2.port.to_s, "replica"]
]
},
},
"users" => { "0" => user }
}
}
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg)

View File

@@ -1,10 +1,8 @@
require 'pg'
require 'json'
require 'tempfile'
require 'toml'
require 'fileutils'
require 'securerandom'
class ConfigReloadFailed < StandardError; end
class PgcatProcess
attr_reader :port
attr_reader :pid
@@ -20,7 +18,7 @@ class PgcatProcess
end
def initialize(log_level)
@env = {}
@env = {"RUST_LOG" => log_level}
@port = rand(20000..32760)
@log_level = log_level
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
@@ -32,7 +30,7 @@ class PgcatProcess
'../../target/debug/pgcat'
end
@command = "#{command_path} #{@config_filename} --log-level #{@log_level}"
@command = "#{command_path} #{@config_filename}"
FileUtils.cp("../../pgcat.toml", @config_filename)
cfg = current_config
@@ -48,34 +46,22 @@ class PgcatProcess
def update_config(config_hash)
@original_config = current_config
Tempfile.create('json_out', '/tmp') do |f|
f.write(config_hash.to_json)
f.flush
`cat #{f.path} | yj -jt > #{@config_filename}`
end
output_to_write = TOML::Generator.new(config_hash).body
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*\]/, ',\1]')
File.write(@config_filename, output_to_write)
end
def current_config
JSON.parse(`cat #{@config_filename} | yj -tj`)
end
def raw_config_file
File.read(@config_filename)
loadable_string = File.read(@config_filename)
loadable_string = loadable_string.gsub(/,\s*(\d+)\s*,/, ', "\1",')
loadable_string = loadable_string.gsub(/,\s*(\d+)\s*\]/, ', "\1"]')
TOML.load(loadable_string)
end
def reload_config
conn = PG.connect(admin_connection_string)
conn.async_exec("RELOAD")
rescue PG::ConnectionBad => e
errors = logs.split("Reloading config").last
errors = errors.gsub(/\e\[([;\d]+)?m/, '') # Remove color codes
errors = errors.
split("\n").select{|line| line.include?("ERROR") }.
map { |line| line.split("pgcat::config: ").last }
raise ConfigReloadFailed, errors.join("\n")
ensure
conn&.close
`kill -s HUP #{@pid}`
sleep 0.5
end
def start
@@ -92,7 +78,6 @@ class PgcatProcess
10.times do
Process.kill 0, @pid
PG::connect(connection_string || example_connection_string).close
return self
rescue Errno::ESRCH
raise StandardError, "Process #{@pid} died. #{logs}"
@@ -126,16 +111,13 @@ class PgcatProcess
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
end
def connection_string(pool_name, username, password = nil, parameters: {})
def connection_string(pool_name, username, password=nil)
cfg = current_config
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
connection_string = "postgresql://#{username}:#{password || user_obj["password"]}@0.0.0.0:#{@port}/#{pool_name}"
# Add the additional parameters to the connection string
parameter_string = parameters.map { |key, value| "#{key}=#{value}" }.join("&")
connection_string += "?#{parameter_string}" unless parameter_string.empty?
password = if password.nil? then user_obj["password"] else password end
connection_string
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
end
def example_connection_string

View File

@@ -65,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
processes.pgcat.shutdown
end
context "under homogeneous load" do
context "under homogenous load" do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

View File

@@ -11,9 +11,9 @@ describe "Query Mirroing" do
before do
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
@@ -25,14 +25,13 @@ describe "Query Mirroing" do
processes.pgcat.shutdown
end
xit "can mirror a query" do
it "can mirror a query" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
runs = 15
runs.times { conn.async_exec("SELECT 1 + 2") }
sleep 0.5
expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs)
# Allow some slack in mirroring successes
expect(mirror_pg.count_select_1_plus_2).to be > ((runs - 5) * 3)
expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3)
end
context "when main server connection is closed" do
@@ -43,9 +42,9 @@ describe "Query Mirroing" do
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config

View File

@@ -221,7 +221,7 @@ describe "Miscellaneous" do
conn.close
end
it "Does not send RESET ALL unless necessary" do
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -229,7 +229,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -239,19 +239,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(10)
end
it "Resets server roles correctly" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.async_exec("SET statement_timeout to 5000")
conn.close
end
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
end
@@ -273,7 +261,7 @@ describe "Miscellaneous" do
end
end
it "Does not send RESET ALL unless necessary" do
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
@@ -282,7 +270,7 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -292,32 +280,8 @@ describe "Miscellaneous" do
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(10)
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
it "Respects tracked parameters on startup" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user", parameters: { "application_name" => "my_pgcat_test" }))
expect(conn.async_exec("SHOW application_name")[0]["application_name"]).to eq("my_pgcat_test")
conn.close
end
it "Respect tracked parameter on set statemet" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET application_name to 'my_pgcat_test'")
expect(conn.async_exec("SHOW application_name")[0]["application_name"]).to eq("my_pgcat_test")
end
it "Ignore untracked parameter on set statemet" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
orignal_statement_timeout = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
conn.async_exec("SET statement_timeout to 1500")
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq(orignal_statement_timeout)
end
end
context "transaction mode with transactions" do
@@ -331,7 +295,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -341,30 +305,7 @@ describe "Miscellaneous" do
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
end
context "server cleanup disabled" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "random", "info", { "cleanup_server_connections" => false }) }
it "will not clean up connection state" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
processes.primary.reset_stats
conn.async_exec("SET statement_timeout TO 1000")
conn.close
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
it "will not clean up prepared statements" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
processes.primary.reset_stats
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
conn.close
expect(processes.primary.count_query("RESET ALL")).to eq(0)
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
end
end
end
@@ -374,9 +315,10 @@ describe "Miscellaneous" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
puts(current_configs["general"]["idle_client_in_transaction_timeout"])
current_configs["general"]["idle_client_in_transaction_timeout"] = 0
processes.pgcat.update_config(current_configs) # with timeout 0
processes.pgcat.reload_config
end
@@ -394,9 +336,9 @@ describe "Miscellaneous" do
context "idle transaction timeout set to 500ms" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
current_configs["general"]["idle_client_in_transaction_timeout"] = 500
processes.pgcat.update_config(current_configs) # with timeout 500
processes.pgcat.reload_config
end
@@ -415,7 +357,7 @@ describe "Miscellaneous" do
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(1) # above 500ms
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
conn.async_exec("SELECT 1") # should be able to send another query
conn.close
end

View File

@@ -1,14 +0,0 @@
require_relative 'spec_helper'
describe "Plugins" do
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
context "intercept" do
it "will intercept an intellij query" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
res = conn.exec("select current_database() as a, current_schemas(false) as b")
expect(res.values).to eq([["sharded_db", "{public}"]])
end
end
end

View File

@@ -1,214 +0,0 @@
require_relative 'spec_helper'
describe 'Prepared statements' do
let(:pool_size) { 5 }
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", pool_size) }
let(:prepared_statements_cache_size) { 100 }
let(:server_round_robin) { false }
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["server_round_robin"] = server_round_robin
new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = pool_size
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
end
context 'when trying prepared statements' do
it 'it allows unparameterized statements to succeed' do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
prepared_query = "SELECT 1"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
conn1.exec_prepared('statement1')
conn2.transaction do
# Claim server 1 with client 2
conn2.exec("SELECT 2")
# Client 1 now runs the prepared query, and it's automatically
# prepared on server 2
conn1.prepare('statement2', prepared_query)
conn1.exec_prepared('statement2')
# Client 2 now prepares the same query that was already
# prepared on server 1. And PgBouncer reuses that already
# prepared query for this different client.
conn2.prepare('statement3', prepared_query)
conn2.exec_prepared('statement3')
end
ensure
conn1.close if conn1
conn2.close if conn2
end
it 'it allows parameterized statements to succeed' do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
prepared_query = "SELECT $1"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
conn1.exec_prepared('statement1', [1])
conn2.transaction do
# Claim server 1 with client 2
conn2.exec("SELECT 2")
# Client 1 now runs the prepared query, and it's automatically
# prepared on server 2
conn1.prepare('statement2', prepared_query)
conn1.exec_prepared('statement2', [1])
# Client 2 now prepares the same query that was already
# prepared on server 1. And PgBouncer reuses that already
# prepared query for this different client.
conn2.prepare('statement3', prepared_query)
conn2.exec_prepared('statement3', [1])
end
ensure
conn1.close if conn1
conn2.close if conn2
end
end
context 'when trying large packets' do
it "works with large parse" do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
long_string = "1" * 4096 * 10
prepared_query = "SELECT '#{long_string}'"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
result = conn1.exec_prepared('statement1')
# assert result matches long_string
expect(result.getvalue(0, 0)).to eq(long_string)
ensure
conn1.close if conn1
end
it "works with large bind" do
conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
long_string = "1" * 4096 * 10
prepared_query = "SELECT $1::text"
# prepare query on server 1 and client 1
conn1.prepare('statement1', prepared_query)
result = conn1.exec_prepared('statement1', [long_string])
# assert result matches long_string
expect(result.getvalue(0, 0)).to eq(long_string)
ensure
conn1.close if conn1
end
end
context 'when statement cache is smaller than set of unqiue statements' do
let(:prepared_statements_cache_size) { 1 }
let(:pool_size) { 1 }
it "evicts all but 1 statement from the server cache" do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
5.times do |i|
prepared_query = "SELECT '#{i}'"
conn.prepare("statement#{i}", prepared_query)
result = conn.exec_prepared("statement#{i}")
expect(result.getvalue(0, 0)).to eq(i.to_s)
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
end
context 'when statement cache is larger than set of unqiue statements' do
let(:pool_size) { 1 }
it "does not evict any of the statements from the cache" do
# cache size 5
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
5.times do |i|
prepared_query = "SELECT '#{i}'"
conn.prepare("statement#{i}", prepared_query)
result = conn.exec_prepared("statement#{i}")
expect(result.getvalue(0, 0)).to eq(i.to_s)
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(5)
end
end
context 'when preparing the same query' do
let(:prepared_statements_cache_size) { 5 }
let(:pool_size) { 5 }
it "reuses statement cache when there are different statement names on the same connection" do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
10.times do |i|
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
end
# Check number of prepared statements (expected: 1)
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
it "reuses statement cache when there are different statement names on different connections" do
10.times do |i|
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
end
# Check number of prepared statements (expected: 1)
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(1)
end
end
context 'when reloading config' do
let(:pool_size) { 1 }
it "test_reload_config" do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
# prepare query
conn.prepare('statement1', 'SELECT 1')
conn.exec_prepared('statement1')
# Reload config which triggers pool recreation
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size + 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
# check that we're starting with no prepared statements on the server
conn_check = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
n_statements = conn_check.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i
expect(n_statements).to eq(0)
# still able to run prepared query
conn.exec_prepared('statement1')
end
end
end

View File

@@ -1,155 +0,0 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "Portocol handling" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "session") }
let(:sequence) { [] }
let(:pgcat_socket) { PostgresSocket.new('localhost', processes.pgcat.port) }
let(:pgdb_socket) { PostgresSocket.new('localhost', processes.all_databases.first.port) }
after do
pgdb_socket.close
pgcat_socket.close
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
def run_comparison(sequence, socket_a, socket_b)
sequence.each do |msg, *args|
socket_a.send(msg, *args)
socket_b.send(msg, *args)
compare_messages(
socket_a.read_from_server,
socket_b.read_from_server
)
end
end
def compare_messages(msg_arr0, msg_arr1)
if msg_arr0.count != msg_arr1.count
error_output = []
error_output << "#{msg_arr0.count} : #{msg_arr1.count}"
error_output << "PgCat Messages"
error_output += msg_arr0.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
error_output << "PgServer Messages"
error_output += msg_arr1.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
error_desc = error_output.join("\n")
raise StandardError, "Message count mismatch #{error_desc}"
end
(0..msg_arr0.count - 1).all? do |i|
msg0 = msg_arr0[i]
msg1 = msg_arr1[i]
result = [
msg0[:code] == msg1[:code],
msg0[:len] == msg1[:len],
msg0[:bytes] == msg1[:bytes],
].all?
next result if result
if result == false
error_string = []
if msg0[:code] != msg1[:code]
error_string << "code #{msg0[:code]} != #{msg1[:code]}"
end
if msg0[:len] != msg1[:len]
error_string << "len #{msg0[:len]} != #{msg1[:len]}"
end
if msg0[:bytes] != msg1[:bytes]
error_string << "bytes #{msg0[:bytes]} != #{msg1[:bytes]}"
end
err = error_string.join("\n")
raise StandardError, "Message mismatch #{err}"
end
end
end
RSpec.shared_examples "at parity with database" do
before do
pgcat_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
pgdb_socket.send_startup_message("sharding_user", "shard0", "sharding_user")
end
it "works" do
run_comparison(sequence, pgcat_socket, pgdb_socket)
end
end
context "Cancel Query" do
let(:sequence) {
[
[:send_query_message, "SELECT pg_sleep(5)"],
[:cancel_query]
]
}
it_behaves_like "at parity with database"
end
xcontext "Simple query after parse" do
let(:sequence) {
[
[:send_parse_message, "SELECT 5"],
[:send_query_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
]
}
# Known to fail due to PgCat not supporting flush
it_behaves_like "at parity with database"
end
xcontext "Flush message" do
let(:sequence) {
[
[:send_parse_message, "SELECT 1"],
[:send_flush_message]
]
}
# Known to fail due to PgCat not supporting flush
it_behaves_like "at parity with database"
end
xcontext "Bind without parse" do
let(:sequence) {
[
[:send_bind_message]
]
}
# This is known to fail.
# Server responds immediately, Proxy buffers the message
it_behaves_like "at parity with database"
end
context "Simple message" do
let(:sequence) {
[[:send_query_message, "SELECT 1"]]
}
it_behaves_like "at parity with database"
end
context "Extended protocol" do
let(:sequence) {
[
[:send_parse_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
]
}
it_behaves_like "at parity with database"
end
end

View File

@@ -7,11 +7,11 @@ describe "Sharding" do
before do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
# Setup the sharding data
3.times do |i|
conn.exec("SET SHARD TO '#{i}'")
conn.exec("DELETE FROM data WHERE id > 0") rescue nil
conn.exec("DELETE FROM data WHERE id > 0")
end
18.times do |i|
@@ -19,16 +19,15 @@ describe "Sharding" do
conn.exec("SET SHARDING KEY TO '#{i}'")
conn.exec("INSERT INTO data (id, value) VALUES (#{i}, 'value_#{i}')")
end
conn.close
end
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "automatic routing of extended protocol" do
describe "automatic routing of extended procotol" do
it "can do it" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.exec("SET SERVER ROLE TO 'auto'")
@@ -49,148 +48,4 @@ describe "Sharding" do
end
end
end
describe "no_shard_specified_behavior config" do
context "when default shard number is invalid" do
it "prevents config reload" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"]["default_shard"] = "shard_99"
processes.pgcat.update_config(current_configs)
expect { processes.pgcat.reload_config }.to raise_error(ConfigReloadFailed, /Invalid shard 99/)
end
end
end
describe "comment-based routing" do
context "when no configs are set" do
it "routes queries with a shard_id comment to the default shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
10.times { conn.async_exec("/* shard_id: 2 */ SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([10, 0, 0])
end
it "does not honor no_shard_specified_behavior directives" do
end
end
[
["shard_id_regex", "/\\* the_shard_id: (\\d+) \\*/", "/* the_shard_id: 1 */"],
["sharding_key_regex", "/\\* the_sharding_key: (\\d+) \\*/", "/* the_sharding_key: 3 */"],
].each do |config_name, config_value, comment_to_use|
context "when #{config_name} config is set" do
let(:no_shard_specified_behavior) { nil }
before do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"][config_name] = config_value
if no_shard_specified_behavior
current_configs["pools"]["sharded_db"]["default_shard"] = no_shard_specified_behavior
else
current_configs["pools"]["sharded_db"].delete("default_shard")
end
processes.pgcat.update_config(current_configs)
processes.pgcat.reload_config
end
it "routes queries with a shard_id comment to the correct shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
context "when no_shard_specified_behavior config is set to random" do
let(:no_shard_specified_behavior) { "random" }
context "with no shard comment" do
it "sends queries to random shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
context "when no_shard_specified_behavior config is set to random_healthy" do
let(:no_shard_specified_behavior) { "random_healthy" }
context "with no shard comment" do
it "sends queries to random healthy shard" do
good_databases = [processes.all_databases[0], processes.all_databases[2]]
bad_database = processes.all_databases[1]
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
250.times { conn.async_exec("SELECT 99") }
bad_database.take_down do
250.times do
conn.async_exec("SELECT 99")
rescue PG::ConnectionBad => e
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
end
end
# Routes traffic away from bad shard
25.times { conn.async_exec("SELECT 1 + 2") }
expect(good_databases.map(&:count_select_1_plus_2).all?(&:positive?)).to be true
expect(bad_database.count_select_1_plus_2).to eq(0)
# Routes traffic to the bad shard if the shard_id is specified
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
bad_database = processes.all_databases[1]
expect(bad_database.count_select_1_plus_2).to eq(25)
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
context "when no_shard_specified_behavior config is set to shard_x" do
let(:no_shard_specified_behavior) { "shard_2" }
context "with no shard comment" do
it "sends queries to the specified shard" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 0, 25])
end
end
context "with a shard comment" do
it "honors the comment" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
25.times { conn.async_exec("#{comment_to_use} SELECT 1 + 2") }
expect(processes.all_databases.map(&:count_select_1_plus_2)).to eq([0, 25, 0])
end
end
end
end
end
end
end

View File

@@ -1,403 +0,0 @@
# frozen_string_literal: true
require 'open3'
require_relative 'spec_helper'
describe "Stats" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "SHOW STATS" do
context "clients connect and make one query" do
it "updates *_query_time and *_wait_time" do
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
end
sleep(1)
connections.map(&:close)
# wait for averages to be calculated, we shouldn't do this too often
sleep(15.5)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
end
end
end
describe "SHOW POOLS" do
context "bad credentials" do
it "does not change any stats" do
bad_password_url = URI(pgcat_conn_str)
bad_password_url.password = "wrong"
expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "bad database name" do
it "does not change any stats" do
bad_db_url = URI(pgcat_conn_str)
bad_db_url.path = "/wrong_db"
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "client connects but issues no queries" do
it "only affects cl_idle stats" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("20")
expect(results["sv_idle"]).to eq(before_test)
connections.map(&:close)
sleep(1.1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq(before_test)
end
end
context "clients connect and make one query" do
it "only affects cl_idle, sv_idle stats" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_active"]).to eq("5")
expect(results["sv_active"]).to eq("5")
sleep(3)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("5")
connections.map(&:close)
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client connects and opens a transaction and closes connection uncleanly" do
it "produces correct statistics" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new do
c.async_exec("BEGIN")
c.async_exec("SELECT pg_sleep(0.01)")
c.close
end
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client fail to checkout connection from the pool" do
it "counts clients as idle" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
threads = []
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
end
sleep(2)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("1")
threads.map(&:join)
connections.map(&:close)
end
end
context "clients connects and disconnect normally" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it 'shows the same number of clients before and after' do
clients_before = clients_connected_to_pool(processes: processes)
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") rescue nil }
end
clients_between = clients_connected_to_pool(processes: processes)
expect(clients_before).not_to eq(clients_between)
connections.each(&:close)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients connects and disconnect abruptly" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
it 'shows the same number of clients before and after' do
threads = []
connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_before = clients_connected_to_pool(processes: processes)
random_string = (0...8).map { (65 + rand(26)).chr }.join
connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
sleep(1)
# psql starts two processes, we only know the pid of the parent, this
# ensure both are killed
`pkill -9 -f '#{random_string}'`
Process.wait(faulty_client)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients overwhelm server pools" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "cl_waiting is updated to show it" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_waiting"]).to eq("2")
expect(results["cl_active"]).to eq("2")
expect(results["sv_active"]).to eq("2")
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("4")
expect(results["sv_idle"]).to eq("2")
threads.map(&:join)
connections.map(&:close)
end
it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
connections.map(&:close)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
end
end
end
describe "SHOW CLIENTS" do
it "reports correct number and application names" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(21) # count admin clients
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
connections[0..5].map(&:close)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(15)
connections[6..].map(&:close)
sleep(1) # Wait for stats to be updated
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
admin_conn.close
end
it "reports correct number of queries and transactions" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
connections.each do |c|
c.async_exec("SELECT 1")
c.async_exec("SELECT 2")
c.async_exec("SELECT 3")
c.async_exec("BEGIN")
c.async_exec("SELECT 4")
c.async_exec("SELECT 5")
c.async_exec("COMMIT")
end
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(3)
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
expect(normal_client_results[0]["transaction_count"]).to eq("4")
expect(normal_client_results[1]["transaction_count"]).to eq("4")
expect(normal_client_results[0]["query_count"]).to eq("7")
expect(normal_client_results[1]["query_count"]).to eq("7")
admin_conn.close
connections.map(&:close)
end
context "when client has waited for a server" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "shows correct maxwait" do
threads = []
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW CLIENTS")
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
expect(non_waiting_clients.count).to eq(2)
non_waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
end
expect(waiting_clients.count).to eq(1)
waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
end
admin_conn.close
connections.map(&:close)
end
end
end
describe "Query Storm" do
context "when the proxy receives overwhelmingly large number of short quick queries" do
it "should not have lingering clients or active servers" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
Array.new(40) do
Thread.new do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT pg_sleep(0.1)")
rescue PG::SystemError
ensure
conn.close
end
end.each(&:join)
sleep 1
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
admin_conn.close
end
end
end
end

View File

@@ -1 +0,0 @@
target/

1322
tests/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +0,0 @@
[package]
name = "rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sqlx = { version = "0.6.2", features = [ "runtime-tokio-rustls", "postgres", "json", "tls", "migrate", "time", "uuid", "ipnetwork"] }
tokio = { version = "1", features = ["full"] }

View File

@@ -1,36 +0,0 @@
#[tokio::main]
async fn main() {
test_prepared_statements().await;
}
async fn test_prepared_statements() {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db")
.await
.unwrap();
let mut handles = Vec::new();
for _ in 0..5 {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
}
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}

View File

@@ -1,40 +0,0 @@
#!/bin/bash
#
# Build an Ubuntu deb.
#
script_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
deb_dir="/tmp/pgcat-build"
export PACKAGE_VERSION=${1:-"1.1.1"}
if [[ $(arch) == "x86_64" ]]; then
export ARCH=amd64
else
export ARCH=arm64
fi
cd "$script_dir/.."
cargo build --release
rm -rf "$deb_dir"
mkdir -p "$deb_dir/DEBIAN"
mkdir -p "$deb_dir/usr/bin"
mkdir -p "$deb_dir/etc/systemd/system"
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
chmod +x "$deb_dir/usr/bin/pgcat"
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
cp postinst "$deb_dir/DEBIAN/postinst"
cp postrm "$deb_dir/DEBIAN/postrm"
cp prerm "$deb_dir/DEBIAN/prerm"
chmod +x ${deb_dir}/DEBIAN/post*
chmod +x ${deb_dir}/DEBIAN/pre*
dpkg-deb \
--root-owner-group \
-z1 \
--build "$deb_dir" \
pgcat-${PACKAGE_VERSION}-ubuntu22.04-${ARCH}.deb

View File

@@ -1 +0,0 @@
tomli