Compare commits

..

3 Commits

Author SHA1 Message Date
Lev
7a419f40ea Revert "Require a reason when marking a server bad (#654)"
This reverts commit 4dbef49ec9.
2023-12-04 19:59:53 -08:00
Lev
54c4ad140d Revert "Not sure how this sneaked past CI"
This reverts commit 4c5498b915.
2023-12-04 19:59:42 -08:00
Lev
190e32ae85 Revert "Reset wait times when checked out successfully (#656)"
This reverts commit ec3920d60f.
2023-12-04 19:59:34 -08:00
50 changed files with 610 additions and 4272 deletions

View File

@@ -59,7 +59,6 @@ admin_password = "admin_pass"
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
prepared_statements_cache_size = 500
# If the client doesn't specify, route traffic to
# this role by default.
@@ -142,7 +141,6 @@ query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
prepared_statements_cache_size = 500
[pools.simple_db.users.0]
username = "simple_user"

View File

@@ -10,7 +10,3 @@ updates:
commit-message:
prefix: "chore(deps)"
open-pull-requests-limit: 10
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"

View File

@@ -2,9 +2,7 @@ name: Build and Push
on:
push:
paths:
- '!charts/**.md'
branches:
branches:
- main
tags:
- v*
@@ -23,17 +21,14 @@ jobs:
steps:
- name: Checkout Repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@v2
- name: Determine tags
id: metadata
uses: docker/metadata-action@v5
uses: docker/metadata-action@v4
with:
images: ${{ env.registry }}/${{ env.image-name }}
tags: |
@@ -45,18 +40,15 @@ jobs:
type=raw,value=latest,enable={{ is_default_branch }}
- name: Log in to the Container registry
uses: docker/login-action@v3
uses: docker/login-action@v2.1.0
with:
registry: ${{ env.registry }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push ${{ env.image-name }}
uses: docker/build-push-action@v6
uses: docker/build-push-action@v3
with:
context: .
platforms: linux/amd64,linux/arm64
provenance: false
push: true
tags: ${{ steps.metadata.outputs.tags }}
labels: ${{ steps.metadata.outputs.labels }}

View File

@@ -1,50 +0,0 @@
name: Lint and Test Charts
on:
pull_request:
paths:
- charts/**
- '!charts/**.md'
jobs:
lint-test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3.1.0
with:
fetch-depth: 0
- name: Set up Helm
uses: azure/setup-helm@v3
with:
version: v3.8.1
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python
uses: actions/setup-python@v4.1.0
with:
python-version: 3.7
- name: Set up chart-testing
uses: helm/chart-testing-action@v2.2.1
with:
version: v3.5.1
- name: Run chart-testing (list-changed)
id: list-changed
run: |
changed=$(ct list-changed --config ct.yaml)
if [[ -n "$changed" ]]; then
echo "changed=true" >> $GITHUB_OUTPUT
fi
- name: Run chart-testing (lint)
run: ct lint --config ct.yaml
- name: Create kind cluster
uses: helm/kind-action@v1.7.0
if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install)
run: ct install --config ct.yaml

View File

@@ -1,40 +0,0 @@
name: Release Charts
on:
push:
paths:
- charts/**
- '!**.md'
branches:
- main
jobs:
release:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0
with:
fetch-depth: 0
- name: Configure Git
run: |
git config user.name "$GITHUB_ACTOR"
git config user.email "$GITHUB_ACTOR@users.noreply.github.com"
- name: Install Helm
uses: azure/setup-helm@5119fcb9089d432beecbf79bb2c7915207344b78 # v3.5
with:
version: v3.13.0
- name: Run chart-releaser
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
with:
charts_dir: charts
config: cr.yaml
env:
CR_TOKEN: "${{ secrets.GITHUB_TOKEN }}"

View File

@@ -1,48 +0,0 @@
name: '[CI/CD] Update README metadata'
on:
pull_request_target:
branches:
- main
paths:
- 'charts/*/values.yaml'
# Remove all permissions by default
permissions: {}
jobs:
update-readme-metadata:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Install readme-generator-for-helm
run: npm install -g @bitnami/readme-generator-for-helm
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608
with:
path: charts
ref: ${{github.event.pull_request.head.ref}}
repository: ${{github.event.pull_request.head.repo.full_name}}
token: ${{ secrets.GITHUB_TOKEN }}
- name: Execute readme-generator-for-helm
env:
DIFF_URL: "${{github.event.pull_request.diff_url}}"
TEMP_FILE: "${{runner.temp}}/pr-${{github.event.number}}.diff"
run: |
# This request doesn't consume API calls.
curl -Lkso $TEMP_FILE $DIFF_URL
files_changed="$(sed -nr 's/[\-\+]{3} [ab]\/(.*)/\1/p' $TEMP_FILE | sort | uniq)"
# Adding || true to avoid "Process exited with code 1" errors
charts_dirs_changed="$(echo "$files_changed" | xargs dirname | grep -o "pgcat/[^/]*" | sort | uniq || true)"
for chart in ${charts_dirs_changed}; do
echo "Updating README.md for ${chart}"
readme-generator --values "charts/${chart}/values.yaml" --readme "charts/${chart}/README.md" --schema "/tmp/schema.json"
done
- name: Push changes
run: |
# Push all the changes
cd charts
if git status -s | grep pgcat; then
git config user.name "$GITHUB_ACTOR"
git config user.email "$GITHUB_ACTOR@users.noreply.github.com"
git add . && git commit -am "Update README.md with readme-generator-for-helm" --signoff && git push
fi

View File

@@ -6,32 +6,6 @@ Thank you for contributing! Just a few tips here:
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
Quite simply, make sure you have docker installed and then run
`./start_test_env.sh`
That is it!
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
To rebuild PgCat, just run `cargo build` within the container under `/app`
![Animated gif showing how to run tests](https://github.com/user-attachments/assets/2258fde3-2aed-4efb-bdc5-e4f12dcd4d33)
Happy hacking!
## TODOs

104
Cargo.lock generated
View File

@@ -146,12 +146,6 @@ dependencies = [
"syn 2.0.26",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atomic_enum"
version = "0.2.0"
@@ -548,23 +542,29 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]]
name = "h2"
version = "0.4.6"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"indexmap 1.9.3",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.0"
@@ -609,9 +609,9 @@ dependencies = [
[[package]]
name = "http"
version = "1.1.0"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [
"bytes",
"fnv",
@@ -620,24 +620,12 @@ dependencies = [
[[package]]
name = "http-body"
version = "1.0.1"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-body-util"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"pin-project-lite",
]
@@ -655,12 +643,13 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "1.4.1"
version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
@@ -669,26 +658,13 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
"socket2 0.4.9",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-util"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"pin-project-lite",
"tokio",
]
[[package]]
name = "iana-time-zone"
version = "0.1.57"
@@ -733,6 +709,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]]
name = "indexmap"
version = "2.0.0"
@@ -740,7 +726,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.14.0",
]
[[package]]
@@ -862,7 +848,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [
"hashbrown",
"hashbrown 0.14.0",
]
[[package]]
@@ -1034,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.2.0"
version = "1.1.2-dev4"
dependencies = [
"arc-swap",
"async-trait",
@@ -1048,9 +1034,7 @@ dependencies = [
"fallible-iterator",
"futures",
"hmac",
"http-body-util",
"hyper",
"hyper-util",
"itertools",
"jemallocator",
"log",
@@ -1494,9 +1478,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.13.2"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]]
name = "socket2"
@@ -1526,9 +1510,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.41.0"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
dependencies = [
"log",
"sqlparser_derive",
@@ -1536,13 +1520,13 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.2.2"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 1.0.109",
]
[[package]]
@@ -1757,13 +1741,19 @@ version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap",
"indexmap 2.0.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.37"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.2.0"
version = "1.1.2-dev4"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.41", features = ["visitor"] }
sqlparser = {version = "0.34", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"
@@ -29,9 +29,7 @@ base64 = "0.21"
stringprep = "0.1"
tokio-rustls = "0.24"
rustls-pemfile = "1"
http-body-util = "0.1.2"
hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1.7", features = ["tokio"] }
hyper = { version = "0.14", features = ["full"] }
phf = { version = "0.11.1", features = ["macros"] }
exitcode = "1.1.2"
futures = "0.3"
@@ -49,12 +47,9 @@ serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = [
"json",
"env-filter",
"std",
] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,4 +1,4 @@
FROM rust:1.79.0-slim-bookworm AS builder
FROM rust:1-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential
@@ -19,4 +19,3 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
CMD ["pgcat"]
STOPSIGNAL SIGINT

View File

@@ -1,4 +1,4 @@
FROM cimg/rust:1.79.0
FROM cimg/rust:1.67.1
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \

View File

@@ -268,8 +268,6 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.

View File

@@ -1,23 +0,0 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -1,8 +0,0 @@
apiVersion: v2
name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers:
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.0

View File

@@ -1,22 +0,0 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "pgcat.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "pgcat.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "pgcat.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "pgcat.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View File

@@ -1,3 +0,0 @@
{{/*
Configuration template definition
*/}}

View File

@@ -1,62 +0,0 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "pgcat.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "pgcat.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "pgcat.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "pgcat.labels" -}}
helm.sh/chart: {{ include "pgcat.chart" . }}
{{ include "pgcat.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "pgcat.selectorLabels" -}}
app.kubernetes.io/name: {{ include "pgcat.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "pgcat.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "pgcat.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@@ -1,66 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "pgcat.fullname" . }}
labels:
{{- include "pgcat.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "pgcat.selectorLabels" . | nindent 6 }}
template:
metadata:
annotations:
checksum/secret: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }}
{{- with .Values.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "pgcat.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.image.pullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "pgcat.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.containerSecurityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: pgcat
containerPort: {{ .Values.configuration.general.port }}
protocol: TCP
livenessProbe:
tcpSocket:
port: pgcat
readinessProbe:
tcpSocket:
port: pgcat
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
- mountPath: /etc/pgcat
name: config
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
volumes:
- secret:
defaultMode: 420
secretName: {{ include "pgcat.fullname" . }}
name: config

View File

@@ -1,61 +0,0 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "pgcat.fullname" . -}}
{{- $svcPort := .Values.service.port -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
{{- end }}
{{- end }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1beta1
{{- else -}}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
labels:
{{- include "pgcat.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
pathType: {{ .pathType }}
{{- end }}
backend:
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- else }}
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -1,86 +0,0 @@
apiVersion: v1
kind: Secret
metadata:
name: {{ include "pgcat.fullname" . }}
labels:
{{- include "pgcat.labels" . | nindent 4 }}
type: Opaque
stringData:
pgcat.toml: |
[general]
host = {{ .Values.configuration.general.host | quote }}
port = {{ .Values.configuration.general.port }}
enable_prometheus_exporter = {{ .Values.configuration.general.enable_prometheus_exporter }}
prometheus_exporter_port = {{ .Values.configuration.general.prometheus_exporter_port }}
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
shutdown_timeout = {{ .Values.configuration.general.shutdown_timeout }}
ban_time = {{ .Values.configuration.general.ban_time }}
log_client_connections = {{ .Values.configuration.general.log_client_connections }}
log_client_disconnections = {{ .Values.configuration.general.log_client_disconnections }}
tcp_keepalives_idle = {{ .Values.configuration.general.tcp_keepalives_idle }}
tcp_keepalives_count = {{ .Values.configuration.general.tcp_keepalives_count }}
tcp_keepalives_interval = {{ .Values.configuration.general.tcp_keepalives_interval }}
{{- if and (ne .Values.configuration.general.tls_certificate "-") (ne .Values.configuration.general.tls_private_key "-") }}
tls_certificate = "{{ .Values.configuration.general.tls_certificate }}"
tls_private_key = "{{ .Values.configuration.general.tls_private_key }}"
{{- end }}
admin_username = {{ .Values.configuration.general.admin_username | quote }}
admin_password = {{ .Values.configuration.general.admin_password | quote }}
{{- if and .Values.configuration.general.auth_query_user .Values.configuration.general.auth_query_password .Values.configuration.general.auth_query }}
auth_query = {{ .Values.configuration.general.auth_query | quote }}
auth_query_user = {{ .Values.configuration.general.auth_query_user | quote }}
auth_query_password = {{ .Values.configuration.general.auth_query_password | quote }}
{{- end }}
{{- range $pool := .Values.configuration.pools }}
##
## pool for {{ $pool.name }}
##
[pools.{{ $pool.name | quote }}]
pool_mode = {{ default "transaction" $pool.pool_mode | quote }}
load_balancing_mode = {{ default "random" $pool.load_balancing_mode | quote }}
default_role = {{ default "any" $pool.default_role | quote }}
prepared_statements_cache_size = {{ default 500 $pool.prepared_statements_cache_size }}
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
{{- range $index, $user := $pool.users }}
## pool {{ $pool.name }} user {{ $user.username | quote }}
##
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
username = {{ $user.username | quote }}
password = {{ $user.password | quote }}
pool_size = {{ $user.pool_size }}
statement_timeout = {{ $user.statement_timeout }}
min_pool_size = 3
server_lifetime = 60000
{{- if and $user.server_username $user.server_password }}
server_username = {{ $user.server_username | quote }}
server_password = {{ $user.server_password | quote }}
{{- end }}
{{- end }}
{{- range $index, $shard := $pool.shards }}
## pool {{ $pool.name }} database {{ $shard.database }}
##
[pools.{{ $pool.name | quote }}.shards.{{ $index }}]
{{- if gt (len $shard.servers) 0}}
servers = [
{{- range $server := $shard.servers }}
[ {{ $server.host | quote }}, {{ $server.port }}, {{ $server.role | quote }} ],
{{- end }}
]
{{- end }}
database = {{ $shard.database | quote }}
{{- end }}
{{- end }}

View File

@@ -1,15 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "pgcat.fullname" . }}
labels:
{{- include "pgcat.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: pgcat
protocol: TCP
name: pgcat
selector:
{{- include "pgcat.selectorLabels" . | nindent 4 }}

View File

@@ -1,12 +0,0 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "pgcat.serviceAccountName" . }}
labels:
{{- include "pgcat.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View File

@@ -1,369 +0,0 @@
## String to partially override aspnet-core.fullname template (will maintain the release name)
## @param nameOverride String to partially override common.names.fullname
##
nameOverride: ""
## String to fully override aspnet-core.fullname template
## @param fullnameOverride String to fully override common.names.fullname
##
fullnameOverride: ""
## Number of PgCat replicas to deploy
## @param replicaCount Number of PgCat replicas to deploy
replicaCount: 1
## Bitnami PgCat image version
## ref: https://hub.docker.com/r/bitnami/kubewatch/tags/
##
## @param image.registry PgCat image registry
## @param image.repository PgCat image name
## @param image.tag PgCat image tag
## @param image.pullPolicy PgCat image tag
## @param image.pullSecrets Specify docker-registry secret names as an array
image:
repository: ghcr.io/postgresml/pgcat
# Overrides the image tag whose default is the chart appVersion.
tag: "main"
## Specify a imagePullPolicy
## Defaults to 'Always' if image tag is 'latest', else set to 'IfNotPresent'
## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images
##
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
## Example:
## pullSecrets:
## - myRegistryKeySecretName
##
pullSecrets: []
## Specifies whether a ServiceAccount should be created
##
## @param serviceAccount.create Enable the creation of a ServiceAccount for PgCat pods
## @param serviceAccount.name Name of the created ServiceAccount
##
serviceAccount:
## Specifies whether a service account should be created
create: true
## Annotations to add to the service account
annotations: {}
## The name of the service account to use.
## If not set and create is true, a name is generated using the fullname template
name: ""
## Annotations for server pods.
## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
##
## @param podAnnotations Annotations for PgCat pods
##
podAnnotations: {}
## PgCat containers' SecurityContext
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod
##
## @param podSecurityContext.enabled Enabled PgCat pods' Security Context
## @param podSecurityContext.fsGroup Set PgCat pod's Security Context fsGroup
##
podSecurityContext: {}
# fsGroup: 2000
## PgCat pods' Security Context
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container
##
## @param containerSecurityContext.enabled Enabled PgCat containers' Security Context
## @param containerSecurityContext.runAsUser Set PgCat container's Security Context runAsUser
## @param containerSecurityContext.runAsNonRoot Set PgCat container's Security Context runAsNonRoot
##
containerSecurityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
## PgCat service
##
## @param service.type PgCat service type
## @param service.port PgCat service port
service:
type: ClusterIP
port: 6432
ingress:
enabled: false
className: ""
annotations: {}
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
## PgCat resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
##
## @skip resources Optional description
## @disabled-param resources.limits The resources limits for the PgCat container
## @disabled-param resources.requests The requested resources for the PgCat container
##
resources:
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
limits: {}
# cpu: 100m
# memory: 128Mi
requests: {}
# cpu: 100m
# memory: 128Mi
## Node labels for pod assignment. Evaluated as a template.
## ref: https://kubernetes.io/docs/user-guide/node-selection/
##
## @param nodeSelector Node labels for pod assignment
##
nodeSelector: {}
## Tolerations for pod assignment. Evaluated as a template.
## ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
##
## @param tolerations Tolerations for pod assignment
##
tolerations: []
## Affinity for pod assignment. Evaluated as a template.
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
## Note: podAffinityPreset, podAntiAffinityPreset, and nodeAffinityPreset will be ignored when it's set
##
## @param affinity Affinity for pod assignment
##
affinity: {}
## PgCat configuration
## @param configuration [object]
configuration:
## General pooler settings
## @param [object]
general:
## @param configuration.general.host What IP to run on, 0.0.0.0 means accessible from everywhere.
host: "0.0.0.0"
## @param configuration.general.port Port to run on, same as PgBouncer used in this example.
port: 6432
## @param configuration.general.enable_prometheus_exporter Whether to enable prometheus exporter or not.
enable_prometheus_exporter: false
## @param configuration.general.prometheus_exporter_port Port at which prometheus exporter listens on.
prometheus_exporter_port: 9930
# @param configuration.general.connect_timeout How long to wait before aborting a server connection (ms).
connect_timeout: 5000
# How long an idle connection with a server is left open (ms).
idle_timeout: 30000 # milliseconds
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout: 1000
# @param configuration.general.healthcheck_delay How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay: 30000
# @param configuration.general.shutdown_timeout How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout: 60000
# @param configuration.general.ban_time For how long to ban a server if it fails a health check (seconds).
ban_time: 60 # seconds
# @param configuration.general.log_client_connections If we should log client connections
log_client_connections: false
# @param configuration.general.log_client_disconnections If we should log client disconnections
log_client_disconnections: false
# TLS
# tls_certificate: "server.cert"
# tls_private_key: "server.key"
tls_certificate: "-"
tls_private_key: "-"
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username: "postgres"
admin_password: "postgres"
# Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
# established using the database configured in the pool. This parameter is inherited by every pool and
# can be redefined in pool configuration.
auth_query: null
# User to be used for connecting to servers to obtain the hash used for md5 authentication by sending
# the query specified in auth_query_user. The connection will be established using the database configured
# in the pool. This parameter is inherited by every pool and can be redefined in pool configuration.
#
# @param configuration.general.auth_query_user
auth_query_user: null
# Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending
# the query specified in auth_query_user. The connection will be established using the database configured
# in the pool. This parameter is inherited by every pool and can be redefined in pool configuration.
#
# @param configuration.general.auth_query_password
auth_query_password: null
# Number of seconds of connection idleness to wait before sending a keepalive packet to the server.
tcp_keepalives_idle: 5
# Number of unacknowledged keepalive packets allowed before giving up and closing the connection.
tcp_keepalives_count: 5
# Number of seconds between keepalive packets.
tcp_keepalives_interval: 5
## pool
## configs are structured as pool.<pool_name>
## the pool_name is what clients use as database name when connecting
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
## @param [object]
pools:
[{
name: "simple", pool_mode: "transaction",
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
shards: [{
servers: [{host: "postgres", port: 5432, role: "primary"}],
database: "postgres"
}]
}]
# - ## default values
# ##
# ##
# ##
# name: "db"
# ## Pool mode (see PgBouncer docs for more).
# ## session: one server connection per connected client
# ## transaction: one server connection per client transaction
# ## @param configuration.poolsPostgres.pool_mode
# pool_mode: "transaction"
# ## Load balancing mode
# ## `random` selects the server at random
# ## `loc` selects the server with the least outstanding busy connections
# ##
# ## @param configuration.poolsPostgres.load_balancing_mode
# load_balancing_mode: "random"
# ## Prepared statements cache size.
# ## TODO: update documentation
# ##
# ## @param configuration.poolsPostgres.prepared_statements_cache_size
# prepared_statements_cache_size: 500
# ## If the client doesn't specify, route traffic to
# ## this role by default.
# ##
# ## any: round-robin between primary and replicas,
# ## replica: round-robin between replicas only without touching the primary,
# ## primary: all queries go to the primary unless otherwise specified.
# ## @param configuration.poolsPostgres.default_role
# default_role: "any"
# ## Query parser. If enabled, we'll attempt to parse
# ## every incoming query to determine if it's a read or a write.
# ## If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# ## we'll direct it to the primary.
# ## @param configuration.poolsPostgres.query_parser_enabled
# query_parser_enabled: true
# ## If the query parser is enabled and this setting is enabled, we'll attempt to
# ## infer the role from the query itself.
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
# query_parser_read_write_splitting: true
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# ## load balancing of read queries. Otherwise, the primary will only be used for write
# ## queries. The primary can always be explicitly selected with our custom protocol.
# ## @param configuration.poolsPostgres.primary_reads_enabled
# primary_reads_enabled: true
# ## So what if you wanted to implement a different hashing function,
# ## or you've already built one and you want this pooler to use it?
# ##
# ## Current options:
# ##
# ## pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# ## sha1: A hashing function based on SHA1
# ##
# ## @param configuration.poolsPostgres.sharding_function
# sharding_function: "pg_bigint_hash"
# ## Credentials for users that may connect to this cluster
# ## @param users [array]
# ## @param users[0].username Name of the env var (required)
# ## @param users[0].password Value for the env var (required)
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# users: []
# # - username: "user"
# # password: "pass"
# #
# # # The maximum number of connection from a single Pgcat process to any database in the cluster
# # # is the sum of pool_size across all users.
# # pool_size: 9
# #
# # # Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# # statement_timeout: 0
# #
# # # PostgreSQL username used to connect to the server.
# # server_username: "postgres
# #
# # # PostgreSQL password used to connect to the server.
# # server_password: "postgres
# ## @param shards [array]
# ## @param shards[0].server[0].host Host for this shard
# ## @param shards[0].server[0].port Port for this shard
# ## @param shards[0].server[0].role Role for this shard
# shards: []
# # [ host, port, role ]
# # - servers:
# # - host: "postgres"
# # port: 5432
# # role: "primary"
# # - host: "postgres"
# # port: 5432
# # role: "replica"
# # database: "postgres"
# # # [ host, port, role ]
# # - servers:
# # - host: "postgres"
# # port: 5432
# # role: "primary"
# # - host: "postgres"
# # port: 5432
# # role: "replica"
# # database: "postgres"
# # # [ host, port, role ]
# # - servers:
# # - host: "postgres"
# # port: 5432
# # role: "primary"
# # - host: "postgres"
# # port: 5432
# # role: "replica"
# # database: "postgres"

View File

@@ -1 +0,0 @@
sign: false

View File

@@ -1,5 +0,0 @@
remote: origin
target-branch: main
chart-dirs:
- charts

View File

@@ -1,4 +1,4 @@
FROM rust:bullseye
FROM rust:1.70-bullseye
# Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,6 @@ RestartSec=1
Environment=RUST_LOG=info
LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
ExecReload=/bin/kill -SIGHUP $MAINPID
[Install]
WantedBy=multi-user.target

View File

@@ -55,12 +55,7 @@ where
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
match query_parts
.first()
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
match query_parts[0].to_ascii_uppercase().as_str() {
"BAN" => {
trace!("BAN");
ban(stream, query_parts).await
@@ -89,12 +84,7 @@ where
trace!("SHUTDOWN");
shutdown(stream).await
}
"SHOW" => match query_parts
.get(1)
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
"HELP" => {
trace!("SHOW HELP");
show_help(stream).await

View File

@@ -1437,7 +1437,7 @@ where
.await
{
// We might be in some kind of error/in between protocol state
server.mark_bad(err.to_string().as_str());
server.mark_bad();
return Err(err);
}
@@ -1504,7 +1504,7 @@ where
match write_all_flush(&mut self.write, &response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad(err.to_string().as_str());
server.mark_bad();
return Err(err);
}
};
@@ -1729,13 +1729,14 @@ where
/// and also the pool's statement cache. Add it to extended protocol data.
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
let client_given_name = Parse::get_name(&message)?;
if !self.prepared_statements_enabled || client_given_name.is_empty() {
if !self.prepared_statements_enabled {
debug!("Anonymous parse message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_parse(message, None));
return Ok(());
}
let client_given_name = Parse::get_name(&message)?;
let parse: Parse = (&message).try_into()?;
// Compute the hash of the parse statement
@@ -1773,14 +1774,15 @@ where
/// saved in the client cache.
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
let client_given_name = Bind::get_name(&message)?;
if !self.prepared_statements_enabled || client_given_name.is_empty() {
if !self.prepared_statements_enabled {
debug!("Anonymous bind message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_bind(message, None));
return Ok(());
}
let client_given_name = Bind::get_name(&message)?;
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let message = Bind::rename(message, &rewritten_parse.name)?;
@@ -1832,8 +1834,7 @@ where
}
let describe: Describe = (&message).try_into()?;
let client_given_name = describe.statement_name.clone();
if describe.target == 'P' || client_given_name.is_empty() {
if describe.target == 'P' {
debug!("Portal describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
@@ -1841,6 +1842,8 @@ where
return Ok(());
}
let client_given_name = describe.statement_name.clone();
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let describe = describe.rename(&rewritten_parse.name);
@@ -1923,7 +1926,7 @@ where
Ok(_) => (),
Err(err) => {
// We might be in some kind of error/in between protocol state, better to just kill this server
server.mark_bad(err.to_string().as_str());
server.mark_bad();
return Err(err);
}
};
@@ -1990,13 +1993,11 @@ where
}
},
Err(_) => {
server.mark_bad(
format!(
"Statement timeout while talking to {:?} with user {}",
address, pool.settings.user.username
)
.as_str(),
error!(
"Statement timeout while talking to {:?} with user {}",
address, pool.settings.user.username
);
server.mark_bad();
pool.ban(address, BanReason::StatementTimeout, Some(client_stats));
error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout)

View File

@@ -38,12 +38,12 @@ pub enum Role {
Mirror,
}
impl std::fmt::Display for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Role::Primary => write!(f, "primary"),
Role::Replica => write!(f, "replica"),
Role::Mirror => write!(f, "mirror"),
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Primary => "primary".to_string(),
Role::Replica => "replica".to_string(),
Role::Mirror => "mirror".to_string(),
}
}
}
@@ -476,11 +476,11 @@ pub enum PoolMode {
Session,
}
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolMode::Transaction => write!(f, "transaction"),
PoolMode::Session => write!(f, "session"),
impl ToString for PoolMode {
fn to_string(&self) -> String {
match *self {
PoolMode::Transaction => "transaction".to_string(),
PoolMode::Session => "session".to_string(),
}
}
}
@@ -493,13 +493,12 @@ pub enum LoadBalancingMode {
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections,
}
impl std::fmt::Display for LoadBalancingMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LoadBalancingMode::Random => write!(f, "random"),
impl ToString for LoadBalancingMode {
fn to_string(&self) -> String {
match *self {
LoadBalancingMode::Random => "random".to_string(),
LoadBalancingMode::LeastOutstandingConnections => {
write!(f, "least_outstanding_connections")
"least_outstanding_connections".to_string()
}
}
}
@@ -1000,17 +999,15 @@ impl Config {
pub fn fill_up_auth_query_config(&mut self) {
for (_name, pool) in self.pools.iter_mut() {
if pool.auth_query.is_none() {
pool.auth_query.clone_from(&self.general.auth_query);
pool.auth_query = self.general.auth_query.clone();
}
if pool.auth_query_user.is_none() {
pool.auth_query_user
.clone_from(&self.general.auth_query_user);
pool.auth_query_user = self.general.auth_query_user.clone();
}
if pool.auth_query_password.is_none() {
pool.auth_query_password
.clone_from(&self.general.auth_query_password);
pool.auth_query_password = self.general.auth_query_password.clone();
}
}
}
@@ -1158,7 +1155,7 @@ impl Config {
"Default max server lifetime: {}ms",
self.general.server_lifetime
);
info!("Server round robin: {}", self.general.server_round_robin);
info!("Sever round robin: {}", self.general.server_round_robin);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);

View File

@@ -733,10 +733,6 @@ pub fn configure_socket(stream: &TcpStream) {
}
Err(err) => error!("Could not configure socket: {}", err),
}
match sock_ref.set_nodelay(true) {
Ok(_) => (),
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
}
}
pub trait BytesMutReader {
@@ -821,10 +817,10 @@ impl ExtendedProtocolData {
pub struct Parse {
code: char,
#[allow(dead_code)]
len: u32,
len: i32,
pub name: String,
query: String,
num_params: u16,
num_params: i16,
param_types: Vec<i32>,
}
@@ -834,11 +830,12 @@ impl TryFrom<&BytesMut> for Parse {
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_u32();
let len = cursor.get_i32();
let name = cursor.read_string()?;
let query = cursor.read_string()?;
let num_params = cursor.get_u16();
let num_params = cursor.get_i16();
let mut param_types = Vec::new();
for _ in 0..num_params {
param_types.push(cursor.get_i32());
}
@@ -874,10 +871,10 @@ impl TryFrom<Parse> for BytesMut {
+ 4 * parse.num_params as usize;
bytes.put_u8(parse.code as u8);
bytes.put_u32(len as u32);
bytes.put_i32(len as i32);
bytes.put_slice(name);
bytes.put_slice(query);
bytes.put_u16(parse.num_params);
bytes.put_i16(parse.num_params);
for param in parse.param_types {
bytes.put_i32(param);
}
@@ -944,14 +941,14 @@ impl Parse {
pub struct Bind {
code: char,
#[allow(dead_code)]
len: u64,
len: i64,
portal: String,
pub prepared_statement: String,
num_param_format_codes: u16,
num_param_format_codes: i16,
param_format_codes: Vec<i16>,
num_param_values: u16,
num_param_values: i16,
param_values: Vec<(i32, BytesMut)>,
num_result_column_format_codes: u16,
num_result_column_format_codes: i16,
result_columns_format_codes: Vec<i16>,
}
@@ -961,17 +958,17 @@ impl TryFrom<&BytesMut> for Bind {
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_u32();
let len = cursor.get_i32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
let num_param_format_codes = cursor.get_u16();
let num_param_format_codes = cursor.get_i16();
let mut param_format_codes = Vec::new();
for _ in 0..num_param_format_codes {
param_format_codes.push(cursor.get_i16());
}
let num_param_values = cursor.get_u16();
let num_param_values = cursor.get_i16();
let mut param_values = Vec::new();
for _ in 0..num_param_values {
@@ -993,7 +990,7 @@ impl TryFrom<&BytesMut> for Bind {
}
}
let num_result_column_format_codes = cursor.get_u16();
let num_result_column_format_codes = cursor.get_i16();
let mut result_columns_format_codes = Vec::new();
for _ in 0..num_result_column_format_codes {
@@ -1002,7 +999,7 @@ impl TryFrom<&BytesMut> for Bind {
Ok(Bind {
code,
len: len as u64,
len: len as i64,
portal,
prepared_statement,
num_param_format_codes,
@@ -1041,19 +1038,19 @@ impl TryFrom<Bind> for BytesMut {
len += 2 * bind.num_result_column_format_codes as usize;
bytes.put_u8(bind.code as u8);
bytes.put_u32(len as u32);
bytes.put_i32(len as i32);
bytes.put_slice(portal);
bytes.put_slice(prepared_statement);
bytes.put_u16(bind.num_param_format_codes);
bytes.put_i16(bind.num_param_format_codes);
for param_format_code in bind.param_format_codes {
bytes.put_i16(param_format_code);
}
bytes.put_u16(bind.num_param_values);
bytes.put_i16(bind.num_param_values);
for (param_len, param) in bind.param_values {
bytes.put_i32(param_len);
bytes.put_slice(&param);
}
bytes.put_u16(bind.num_result_column_format_codes);
bytes.put_i16(bind.num_result_column_format_codes);
for result_column_format_code in bind.result_columns_format_codes {
bytes.put_i16(result_column_format_code);
}
@@ -1067,7 +1064,7 @@ impl Bind {
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
let mut cursor = Cursor::new(buf);
// Skip the code and length
cursor.advance(mem::size_of::<u8>() + mem::size_of::<u32>());
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
cursor.read_string()?;
cursor.read_string()
}
@@ -1077,17 +1074,17 @@ impl Bind {
let mut cursor = Cursor::new(&buf);
// Read basic data from the cursor
let code = cursor.get_u8();
let current_len = cursor.get_u32();
let current_len = cursor.get_i32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
// Calculate new length
let new_len = current_len + new_name.len() as u32 - prepared_statement.len() as u32;
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
// Begin building the response buffer
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
response_buf.put_u8(code);
response_buf.put_u32(new_len);
response_buf.put_i32(new_len);
// Put the portal and new name into the buffer
// Note: panic if the provided string contains null byte
@@ -1111,7 +1108,7 @@ pub struct Describe {
code: char,
#[allow(dead_code)]
len: u32,
len: i32,
pub target: char,
pub statement_name: String,
}
@@ -1122,7 +1119,7 @@ impl TryFrom<&BytesMut> for Describe {
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
let mut cursor = Cursor::new(bytes);
let code = cursor.get_u8() as char;
let len = cursor.get_u32();
let len = cursor.get_i32();
let target = cursor.get_u8() as char;
let statement_name = cursor.read_string()?;
@@ -1145,7 +1142,7 @@ impl TryFrom<Describe> for BytesMut {
let len = 4 + 1 + statement_name.len();
bytes.put_u8(describe.code as u8);
bytes.put_u32(len as u32);
bytes.put_i32(len as i32);
bytes.put_u8(describe.target as u8);
bytes.put_slice(statement_name);

View File

@@ -85,9 +85,8 @@ impl MirroredClient {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
}
}
}
@@ -99,9 +98,8 @@ impl MirroredClient {
match server.send(&BytesMut::from(&bytes[..])).await {
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to receive from mirror {:?} {:?}", err, address.clone()).as_str()
);
server.mark_bad();
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
}
}
}

View File

@@ -769,6 +769,7 @@ impl ConnectionPool {
);
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
address.stats.error();
client_stats.idle();
client_stats.checkout_error();
continue;
}
@@ -787,7 +788,7 @@ impl ConnectionPool {
// Health checks are pretty expensive.
if !require_healthcheck {
let checkout_time = now.elapsed().as_micros() as u64;
client_stats.checkout_success();
client_stats.checkout_time(checkout_time);
server
.stats()
.checkout_time(checkout_time, client_stats.application_name());
@@ -801,7 +802,7 @@ impl ConnectionPool {
.await
{
let checkout_time = now.elapsed().as_micros() as u64;
client_stats.checkout_success();
client_stats.checkout_time(checkout_time);
server
.stats()
.checkout_time(checkout_time, client_stats.application_name());
@@ -813,7 +814,10 @@ impl ConnectionPool {
}
}
client_stats.checkout_error();
client_stats.idle();
let checkout_time = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
Err(Error::AllServersDown)
}
@@ -839,7 +843,7 @@ impl ConnectionPool {
Ok(res) => match res {
Ok(_) => {
let checkout_time: u64 = start.elapsed().as_micros() as u64;
client_info.checkout_success();
client_info.checkout_time(checkout_time);
server
.stats()
.checkout_time(checkout_time, client_info.application_name());
@@ -867,7 +871,7 @@ impl ConnectionPool {
}
// Don't leave a bad connection in the pool.
server.mark_bad("failed health check");
server.mark_bad();
self.ban(address, BanReason::FailedHealthCheck, Some(client_info));
false

View File

@@ -1,41 +1,23 @@
use http_body_util::Full;
use hyper::body;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use phf::phf_map;
use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use tokio::net::TcpListener;
use std::sync::Arc;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::get_server_stats;
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
ty: &'static str,
}
struct ServerPrometheusStats {
bytes_received: u64,
bytes_sent: u64,
transaction_count: u64,
query_count: u64,
error_count: u64,
active_count: u64,
idle_count: u64,
login_count: u64,
tested_count: u64,
}
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
// counters only increase
// gauges can arbitrarily increase or decrease
@@ -138,46 +120,22 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
},
"servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server",
ty: "counter",
ty: "gauge",
},
"servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server",
ty: "counter",
ty: "gauge",
},
"servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server",
ty: "counter",
ty: "gauge",
},
"servers_query_count" => MetricHelpType {
help: "Number of queries executed by server",
ty: "counter",
ty: "gauge",
},
"servers_error_count" => MetricHelpType {
help: "Number of errors",
ty: "counter",
},
"servers_idle_count" => MetricHelpType {
help: "Number of server connection in idle state",
ty: "gauge",
},
"servers_active_count" => MetricHelpType {
help: "Number of server connection in active state",
ty: "gauge",
},
"servers_tested_count" => MetricHelpType {
help: "Number of server connection in tested state",
ty: "gauge",
},
"servers_login_count" => MetricHelpType {
help: "Number of server connection in login state",
ty: "gauge",
},
"servers_is_banned" => MetricHelpType {
help: "0 if server is not banned, 1 if server is banned",
ty: "gauge",
},
"servers_is_paused" => MetricHelpType {
help: "0 if server is not paused, 1 if server is paused",
ty: "gauge",
},
"databases_pool_size" => MetricHelpType {
@@ -245,9 +203,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -262,9 +218,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -274,9 +229,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -290,9 +243,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
}
}
async fn prometheus_stats(
request: Request<body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => {
let mut lines = Vec::new();
@@ -378,51 +329,34 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats();
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
for (_, stats) in server_stats {
let entry = prom_stats
.entry(stats.address_name())
.or_insert(ServerPrometheusStats {
bytes_received: 0,
bytes_sent: 0,
transaction_count: 0,
query_count: 0,
error_count: 0,
active_count: 0,
idle_count: 0,
login_count: 0,
tested_count: 0,
});
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
entry.query_count += stats.query_count.load(Ordering::Relaxed);
entry.error_count += stats.error_count.load(Ordering::Relaxed);
match stats.state.load(Ordering::Relaxed) {
crate::stats::ServerState::Login => entry.login_count += 1,
crate::stats::ServerState::Active => entry.active_count += 1,
crate::stats::ServerState::Tested => entry.tested_count += 1,
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
server_stats_by_addresses.insert(stats.address_name(), stats);
}
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(server_info) = prom_stats.get(&address.name()) {
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
let metrics = [
("bytes_received", server_info.bytes_received),
("bytes_sent", server_info.bytes_sent),
("transaction_count", server_info.transaction_count),
("query_count", server_info.query_count),
("error_count", server_info.error_count),
("idle_count", server_info.idle_count),
("active_count", server_info.active_count),
("login_count", server_info.login_count),
("tested_count", server_info.tested_count),
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
("is_paused", if pool.paused() { 1 } else { 0 }),
(
"bytes_received",
server_info.bytes_received.load(Ordering::Relaxed),
),
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
(
"transaction_count",
server_info.transaction_count.load(Ordering::Relaxed),
),
(
"query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =
@@ -440,35 +374,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
pub async fn start_metric_server(http_addr: SocketAddr) {
let listener = TcpListener::bind(http_addr);
let listener = match listener.await {
Ok(listener) => listener,
Err(e) => {
error!("Failed to bind prometheus server to HTTP address: {}.", e);
return;
}
};
let http_service_factory =
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
let server = Server::bind(&http_addr).serve(http_service_factory);
info!(
"Exposing prometheus metrics on http://{}/metrics.",
http_addr
);
loop {
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(e) => {
error!("Error accepting connection: {}", e);
continue;
}
};
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(prometheus_stats))
.await
{
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
}
});
if let Err(e) = server.await {
error!("Failed to run HTTP server: {}.", e);
}
}

View File

@@ -427,12 +427,8 @@ impl QueryRouter {
None => (),
};
let has_locks = !query.locks.is_empty();
if has_locks {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
// If we already visited a write statement, we should be going to the primary.
if !visited_write_statement {
self.active_role = match self.primary_reads_enabled() {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
@@ -503,7 +499,6 @@ impl QueryRouter {
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(or.is_none());
@@ -511,9 +506,7 @@ impl QueryRouter {
assert!(after_columns.is_empty());
Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
Delete {
tables,
@@ -521,8 +514,6 @@ impl QueryRouter {
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone());
@@ -1162,29 +1153,6 @@ mod test {
}
}
#[test]
fn test_select_for_update() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let queries_in_primary_role = vec![
simple_query("BEGIN"), // Transaction start
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
];
for query in queries_in_primary_role {
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
// query without lock do not change role
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), None);
}
#[test]
fn test_infer_primary_reads_enabled() {
QueryRouter::setup();
@@ -1399,19 +1367,6 @@ mod test {
assert!(!qr.query_parser_enabled());
}
#[test]
fn test_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
}
#[test]
fn test_update_from_pool_settings() {
QueryRouter::setup();

View File

@@ -698,6 +698,7 @@ impl Server {
))
}
};
trace!("Error: {}", error_code);
match error_code {
@@ -1012,12 +1013,6 @@ impl Server {
// which can leak between clients. This is a best effort to block bad clients
// from poisoning a transaction-mode pool by setting inappropriate session variables
match command.as_str() {
"DISCARD ALL" => {
self.clear_prepared_statement_cache();
}
"DEALLOCATE ALL" => {
self.clear_prepared_statement_cache();
}
"SET" => {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
@@ -1137,12 +1132,6 @@ impl Server {
has_it
}
fn clear_prepared_statement_cache(&mut self) {
if let Some(cache) = &mut self.prepared_statement_cache {
cache.clear();
}
}
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
@@ -1290,8 +1279,8 @@ impl Server {
}
/// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self, reason: &str) {
error!("Server {:?} marked bad, reason: {}", self.address, reason);
pub fn mark_bad(&mut self) {
error!("Server {:?} marked bad", self.address);
self.bad = true;
}

View File

@@ -14,11 +14,11 @@ pub enum ShardingFunction {
Sha1,
}
impl std::fmt::Display for ShardingFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
ShardingFunction::Sha1 => write!(f, "sha1"),
impl ToString for ShardingFunction {
fn to_string(&self) -> String {
match *self {
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
ShardingFunction::Sha1 => "sha1".to_string(),
}
}
}

View File

@@ -38,13 +38,10 @@ pub struct ClientStats {
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds
pub max_wait_time: Arc<AtomicU64>,
// Time when the client started waiting for a connection from pool, measures in microseconds
// We use connect_time as the reference point for this value
// U64 can represent ~5850 centuries in microseconds, so we should be fine
pub wait_start_us: Arc<AtomicU64>,
/// When this client started waiting.
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
/// of us using an "AtomicInstant"
pub wait_start: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
@@ -68,8 +65,7 @@ impl Default for ClientStats {
username: String::new(),
pool_name: String::new(),
total_wait_time: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)),
wait_start_us: Arc::new(AtomicU64::new(0)),
wait_start: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
@@ -117,9 +113,11 @@ impl ClientStats {
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
let wait_start = self.connect_time.elapsed().as_micros() as u64;
self.wait_start_us.store(wait_start, Ordering::Relaxed);
// safe to truncate, we only lose info if duration is greater than ~585,000 years
self.wait_start.store(
Instant::now().duration_since(self.connect_time).as_micros() as u64,
Ordering::Relaxed,
);
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
@@ -131,13 +129,6 @@ impl ClientStats {
/// Reports a client has failed to obtain a connection from a connection pool
pub fn checkout_error(&self) {
self.state.store(ClientState::Idle, Ordering::Relaxed);
self.update_wait_times();
}
/// Reports a client has succeeded in obtaining a connection from a connection pool
pub fn checkout_success(&self) {
self.state.store(ClientState::Active, Ordering::Relaxed);
self.update_wait_times();
}
/// Reports a client has had the server assigned to it be banned
@@ -146,26 +137,10 @@ impl ClientStats {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
fn update_wait_times(&self) {
if self.wait_start_us.load(Ordering::Relaxed) == 0 {
return;
}
let wait_time_us = self.get_current_wait_time_us();
/// Reporters the time spent by a client waiting to get a healthy connection from the pool
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(wait_time_us, Ordering::Relaxed);
self.max_wait_time
.fetch_max(wait_time_us, Ordering::Relaxed);
self.wait_start_us.store(0, Ordering::Relaxed);
}
pub fn get_current_wait_time_us(&self) -> u64 {
let wait_start_us = self.wait_start_us.load(Ordering::Relaxed);
let microseconds_since_connection_epoch = self.connect_time.elapsed().as_micros() as u64;
if wait_start_us == 0 || microseconds_since_connection_epoch < wait_start_us {
return 0;
}
microseconds_since_connection_epoch - wait_start_us
.fetch_add(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server

View File

@@ -4,6 +4,7 @@ use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap;
use std::sync::atomic::*;
use tokio::time::Instant;
use crate::pool::get_all_pools;
@@ -53,6 +54,7 @@ impl PoolStats {
);
}
let now = Instant::now();
for client in client_map.values() {
match map.get_mut(&PoolIdentifier {
db: client.pool_name(),
@@ -62,12 +64,15 @@ impl PoolStats {
match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => pool_stats.cl_waiting += 1,
}
let wait_start_us = client.wait_start_us.load(Ordering::Relaxed);
if wait_start_us > 0 {
let wait_time_us = client.get_current_wait_time_us();
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time_us);
ClientState::Waiting => {
pool_stats.cl_waiting += 1;
// wait_start is measured as microseconds since connect_time
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
let duration_since_connect = now.duration_since(client.connect_time());
let wait_time = (duration_since_connect.as_micros() as u64)
- client.wait_start.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
}
}
}
None => debug!("Client from an obselete pool"),

View File

@@ -1,34 +0,0 @@
GREEN="\033[0;32m"
RED="\033[0;31m"
BLUE="\033[0;34m"
RESET="\033[0m"
cd tests/docker/
docker compose kill main || true
docker compose build main
docker compose down
docker compose up -d
# wait for the container to start
while ! docker compose exec main ls; do
echo "Waiting for test environment to start"
sleep 1
done
echo "==================================="
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
docker compose exec --workdir /app main cargo build
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
docker compose exec --workdir /app/tests/ruby main bundle install
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again"
echo "==================================="
docker compose exec --workdir /app/tests main bash

View File

@@ -1,3 +1,4 @@
version: "3"
services:
pg1:
image: postgres:14
@@ -47,8 +48,6 @@ services:
main:
build: .
command: ["bash", "/app/tests/docker/run.sh"]
environment:
- INTERACTIVE_TEST_ENVIRONMENT=true
volumes:
- ../../:/app/
- /app/target/

View File

@@ -5,38 +5,6 @@ rm /app/*.profraw || true
rm /app/pgcat.profdata || true
rm -rf /app/cov || true
# Prepares the interactive test environment
#
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
ports=(5432 7432 8432 9432 10432)
for port in "${ports[@]}"; do
is_it_up=0
attempts=0
while [ $is_it_up -eq 0 ]; do
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "PostgreSQL on port $port is up."
is_it_up=1
else
attempts=$((attempts+1))
if [ $attempts -gt 10 ]; then
echo "PostgreSQL on port $port is down, giving up."
exit 1
fi
echo "PostgreSQL on port $port is down, waiting for it to start."
sleep 1
fi
done
done
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
sleep 100000000000000000
exit 0
fi
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
export RUSTC_BOOTSTRAP=1
export CARGO_INCREMENTAL=0

View File

@@ -91,27 +91,6 @@ describe "Admin" do
end
end
[
"SHOW ME THE MONEY",
"SHOW ME THE WAY",
"SHOW UP",
"SHOWTIME",
"HAMMER TIME",
"SHOWN TO BE TRUE",
"SHOW ",
"SHOW ",
"SHOW 1",
";;;;;"
].each do |cmd|
describe "Bad command #{cmd}" do
it "does not panic and responds with PG::SystemError" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
admin_conn.close
end
end
end
describe "PAUSE" do
it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)

View File

@@ -1,145 +0,0 @@
class PostgresMessage
# Base class for common functionality
def encode_string(str)
"#{str}\0" # Encode a string with a null terminator
end
def encode_int16(value)
[value].pack('n') # Encode an Int16
end
def encode_int32(value)
[value].pack('N') # Encode an Int32
end
def message_prefix(type, length)
"#{type}#{encode_int32(length)}" # Message type and length prefix
end
end
class SimpleQueryMessage < PostgresMessage
attr_accessor :query
def initialize(query = "")
@query = query
end
def to_bytes
query_bytes = encode_string(@query)
length = 4 + query_bytes.size # Length includes 4 bytes for length itself
message_prefix('Q', length) + query_bytes
end
end
class ParseMessage < PostgresMessage
attr_accessor :statement_name, :query, :parameter_types
def initialize(statement_name = "", query = "", parameter_types = [])
@statement_name = statement_name
@query = query
@parameter_types = parameter_types
end
def to_bytes
statement_name_bytes = encode_string(@statement_name)
query_bytes = encode_string(@query)
parameter_types_bytes = @parameter_types.pack('N*')
length = 4 + statement_name_bytes.size + query_bytes.size + 2 + parameter_types_bytes.size
message_prefix('P', length) + statement_name_bytes + query_bytes + encode_int16(@parameter_types.size) + parameter_types_bytes
end
end
class BindMessage < PostgresMessage
attr_accessor :portal_name, :statement_name, :parameter_format_codes, :parameters, :result_column_format_codes
def initialize(portal_name = "", statement_name = "", parameter_format_codes = [], parameters = [], result_column_format_codes = [])
@portal_name = portal_name
@statement_name = statement_name
@parameter_format_codes = parameter_format_codes
@parameters = parameters
@result_column_format_codes = result_column_format_codes
end
def to_bytes
portal_name_bytes = encode_string(@portal_name)
statement_name_bytes = encode_string(@statement_name)
parameter_format_codes_bytes = @parameter_format_codes.pack('n*')
parameters_bytes = @parameters.map do |param|
if param.nil?
encode_int32(-1)
else
encode_int32(param.bytesize) + param
end
end.join
result_column_format_codes_bytes = @result_column_format_codes.pack('n*')
length = 4 + portal_name_bytes.size + statement_name_bytes.size + 2 + parameter_format_codes_bytes.size + 2 + parameters_bytes.size + 2 + result_column_format_codes_bytes.size
message_prefix('B', length) + portal_name_bytes + statement_name_bytes + encode_int16(@parameter_format_codes.size) + parameter_format_codes_bytes + encode_int16(@parameters.size) + parameters_bytes + encode_int16(@result_column_format_codes.size) + result_column_format_codes_bytes
end
end
class DescribeMessage < PostgresMessage
attr_accessor :type, :name
def initialize(type = 'S', name = "")
@type = type
@name = name
end
def to_bytes
name_bytes = encode_string(@name)
length = 4 + 1 + name_bytes.size
message_prefix('D', length) + @type + name_bytes
end
end
class ExecuteMessage < PostgresMessage
attr_accessor :portal_name, :max_rows
def initialize(portal_name = "", max_rows = 0)
@portal_name = portal_name
@max_rows = max_rows
end
def to_bytes
portal_name_bytes = encode_string(@portal_name)
length = 4 + portal_name_bytes.size + 4
message_prefix('E', length) + portal_name_bytes + encode_int32(@max_rows)
end
end
class FlushMessage < PostgresMessage
def to_bytes
length = 4
message_prefix('H', length)
end
end
class SyncMessage < PostgresMessage
def to_bytes
length = 4
message_prefix('S', length)
end
end
class CloseMessage < PostgresMessage
attr_accessor :type, :name
def initialize(type = 'S', name = "")
@type = type
@name = name
end
def to_bytes
name_bytes = encode_string(@name)
length = 4 + 1 + name_bytes.size
message_prefix('C', length) + @type + name_bytes
end
end

View File

@@ -1,6 +1,5 @@
require 'socket'
require 'digest/md5'
require_relative 'frontend_messages'
BACKEND_MESSAGE_CODES = {
'Z' => "ReadyForQuery",
@@ -19,13 +18,9 @@ class PostgresSocket
@host = host
@socket = TCPSocket.new @host, @port
@parameters = {}
@verbose = false
@verbose = true
end
def send_message(message)
@socket.write(message.to_bytes)
end
def send_md5_password_message(username, password, salt)
m = Digest::MD5.hexdigest(password + username)
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
@@ -118,6 +113,107 @@ class PostgresSocket
log "[F] Sent CancelRequest message"
end
def send_query_message(query)
query_size = query.length
message_size = 1 + 4 + query_size
message = []
message << "Q".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent Q message (#{query})"
end
def send_parse_message(query)
query_size = query.length
message_size = 2 + 2 + 4 + query_size
message = []
message << "P".ord
message << [message_size].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << query.split('').map(&:ord) # 2, 11
message << 0 # 1, 12
message << [0, 0]
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent P message (#{query})"
end
def send_bind_message
message = []
message << "B".ord
message << [12].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << 0 # unnamed statement
message << [0, 0] # 2
message << [0, 0] # 2
message << [0, 0] # 2
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent B message"
end
def send_describe_message(mode)
message = []
message << "D".ord
message << [6].pack('l>').unpack('CCCC') # 4
message << mode.ord
message << 0 # unnamed statement
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent D message"
end
def send_execute_message(limit=0)
message = []
message << "E".ord
message << [9].pack('l>').unpack('CCCC') # 4
message << 0 # unnamed statement
message << [limit].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent E message"
end
def send_sync_message
message = []
message << "S".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent S message"
end
def send_copydone_message
message = []
message << "c".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent c message"
end
def send_copyfail_message
message = []
message << "f".ord
message << [5].pack('l>').unpack('CCCC') # 4
message << 0
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent f message"
end
def send_flush_message
message = []
message << "H".ord
message << [4].pack('l>').unpack('CCCC') # 4
message.flatten!
@socket.write(message.flatten.pack('C*'))
log "[F] Sent H message"
end
def read_from_server()
output_messages = []
retry_count = 0

View File

@@ -16,14 +16,10 @@ describe "Portocol handling" do
end
def run_comparison(sequence, socket_a, socket_b)
sequence.each do |msg|
if msg.is_a?(Symbol)
socket_a.send(msg)
socket_b.send(msg)
else
socket_a.send_message(msg)
socket_b.send_message(msg)
end
sequence.each do |msg, *args|
socket_a.send(msg, *args)
socket_b.send(msg, *args)
compare_messages(
socket_a.read_from_server,
socket_b.read_from_server
@@ -87,9 +83,9 @@ describe "Portocol handling" do
context "Cancel Query" do
let(:sequence) {
[
SimpleQueryMessage.new("SELECT pg_sleep(5)"),
:cancel_query
[
[:send_query_message, "SELECT pg_sleep(5)"],
[:cancel_query]
]
}
@@ -99,12 +95,12 @@ describe "Portocol handling" do
xcontext "Simple query after parse" do
let(:sequence) {
[
ParseMessage.new("", "SELECT 5", []),
SimpleQueryMessage.new("SELECT 1"),
BindMessage.new("", "", [], [], [0]),
DescribeMessage.new("P", ""),
ExecuteMessage.new("", 1),
SyncMessage.new
[:send_parse_message, "SELECT 5"],
[:send_query_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
]
}
@@ -115,8 +111,8 @@ describe "Portocol handling" do
xcontext "Flush message" do
let(:sequence) {
[
ParseMessage.new("", "SELECT 1", []),
FlushMessage.new
[:send_parse_message, "SELECT 1"],
[:send_flush_message]
]
}
@@ -126,7 +122,9 @@ describe "Portocol handling" do
xcontext "Bind without parse" do
let(:sequence) {
[BindMessage.new("", "", [], [], [0])]
[
[:send_bind_message]
]
}
# This is known to fail.
# Server responds immediately, Proxy buffers the message
@@ -135,155 +133,23 @@ describe "Portocol handling" do
context "Simple message" do
let(:sequence) {
[SimpleQueryMessage.new("SELECT 1")]
[[:send_query_message, "SELECT 1"]]
}
it_behaves_like "at parity with database"
end
10.times do |i|
context "Extended protocol" do
let(:sequence) {
[
ParseMessage.new("", "SELECT 1", []),
BindMessage.new("", "", [], [], [0]),
DescribeMessage.new("S", ""),
ExecuteMessage.new("", 1),
SyncMessage.new
]
}
it_behaves_like "at parity with database"
end
context "Extended protocol" do
let(:sequence) {
[
[:send_parse_message, "SELECT 1"],
[:send_bind_message],
[:send_describe_message, "P"],
[:send_execute_message],
[:send_sync_message],
]
}
it_behaves_like "at parity with database"
end
describe "Protocol-level prepared statements" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "transaction") }
before do
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
table_query = "CREATE TABLE IF NOT EXISTS employees (employee_id SERIAL PRIMARY KEY, salary NUMERIC(10, 2) CHECK (salary > 0));"
q_sock.send_message(SimpleQueryMessage.new(table_query))
q_sock.close
current_configs = processes.pgcat.current_config
current_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = 500
processes.pgcat.update_config(current_configs)
processes.pgcat.reload_config
end
after do
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
table_query = "DROP TABLE IF EXISTS employees;"
q_sock.send_message(SimpleQueryMessage.new(table_query))
q_sock.close
end
context "When unnamed prepared statements are used" do
it "does not cache them" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
10.times do |i|
socket.send_message(ParseMessage.new("", "SELECT #{i}", []))
socket.send_message(BindMessage.new("", "", [], [], [0]))
socket.send_message(DescribeMessage.new("S", ""))
socket.send_message(ExecuteMessage.new("", 1))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
result = socket.read_from_server
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
expect(number_of_saved_statements).to eq(0)
end
end
context "When named prepared statements are used" do
it "caches them" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
3.times do
socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
3.times do
socket.send_message(ParseMessage.new("my_other_query", "SELECT * FROM employees WHERE salary in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_other_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
socket.send_message(SyncMessage.new)
socket.read_from_server
end
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
result = socket.read_from_server
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
expect(number_of_saved_statements).to eq(2)
end
end
context "When DISCARD ALL/DEALLOCATE ALL are called" do
it "resets server and client caches" do
socket = PostgresSocket.new('localhost', processes.pgcat.port)
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
20.times do |i|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
end
20.times do |i|
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
end
socket.send_message(SyncMessage.new)
socket.read_from_server
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
socket.read_from_server
responses = []
4.times do |i|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
socket.send_message(SyncMessage.new)
responses += socket.read_from_server
end
errors = responses.select { |message| message[:code] == 'E' }
error_message = errors.map { |message| message[:bytes].map(&:chr).join("") }.join("\n")
raise StandardError, "Encountered the following errors: #{error_message}" if errors.length > 0
end
end
context "Maximum number of bound paramters" do
it "does not crash" do
test_socket = PostgresSocket.new('localhost', processes.pgcat.port)
test_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
types = Array.new(65_535) { |i| 0 }
params = Array.new(65_535) { |i| "$#{i+1}" }.join(",")
test_socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in (#{params})", types))
test_socket.send_message(BindMessage.new("my_query", "my_query", types, types.map(&:to_s), types))
test_socket.send_message(SyncMessage.new)
# If the proxy crashes, this will raise an error
expect { test_socket.read_from_server }.to_not raise_error
test_socket.close
end
end
end
end
end

View File

@@ -233,19 +233,17 @@ describe "Stats" do
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["maxwait"]).to eq("1")
expect(results["cl_waiting"]).to eq("2")
expect(results["cl_active"]).to eq("2")
expect(results["sv_active"]).to eq("2")
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("4")
@@ -257,24 +255,31 @@ describe "Stats" do
it "show correct max_wait" do
threads = []
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
# two connections waiting => they report wait time
sleep(1.1) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
# Value is only reported when there are clients waiting
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(20_000).of(100_000)
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
# no clients are waiting so value is 0
# no connections waiting => no reported wait time
expect(results["maxwait"]).to eq("0")
expect(results["maxwait_us"]).to eq("0")
connections.map(&:close)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
end
end

664
tests/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,11 +15,13 @@ async fn test_prepared_statements() {
for _ in 0..5 {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for i in 0..1000 {
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
panic!("prepared statement error: {}", err);
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
}
}
}