Compare commits

..

29 Commits

Author SHA1 Message Date
Mostafa
3796e26402 Fix contact info for Helm chart (#861)
* Fix contact info for Helm chart
2024-11-11 09:24:17 -06:00
Mostafa
0ee59c0c40 Another no-op helm release (#853) 2024-11-08 06:07:12 -06:00
Mostafa
b61d2cc6f0 Use main branch for helm chart releases (#852) 2024-11-08 06:04:42 -06:00
Jose Fernández
c11418c083 Revert "Do not unban replicas if a primary is available" (#850)
Revert "Do not unban replicas if a primary is available (#843)"

This reverts commit cdcfa99fb9.
2024-11-07 22:00:43 +01:00
Jose Fernández
c9544bdff2 Fix default_role being ignored when query_parser_enabled was false (#847)
Fix default_role being ignored when query_parser_enabled was false
2024-11-07 11:11:49 -06:00
Jose Fernández
cdcfa99fb9 Do not unban replicas if a primary is available (#843)
Add `unban_replicas_when_all_banned` to control unbanning replicas behavior.
2024-11-07 11:11:11 -06:00
Víťa Tauer
f27dc6b483 Fixing invalid setting name in pgcat.toml (#849) 2024-11-07 06:17:09 -06:00
Mostafa
326efc22b3 Another no-op release for helm (#845)
Another no-op release
2024-11-02 18:05:41 -05:00
Mostafa
01c6afb2e5 Attempt a helm chart release (#844)
Attempt a release

Co-authored-by: Mostafa <no_reply@github.com>
2024-11-02 11:55:18 -05:00
Nicolas Vanelslande
a68071dd28 Bump bb8 from 0.8.1 to 0.8.6 (#709)
* Update bb8 to 0.8.6

To get https://github.com/djc/bb8/pull/186 and https://github.com/djc/bb8/pull/189
which fix potential deadlocks (https://github.com/djc/bb8/issues/154).

Also, this (https://github.com/djc/bb8/pull/225) was needed to prevent a connection
leak which was conveniently spotted in our integration tests.

* Ignore ./.bundle (created by dev console)

---------

Co-authored-by: Jose Fernandez (magec) <joseferper@gmail.com>
2024-10-28 06:49:36 -05:00
Mostafa
c27d801abf Rename a couple of variables (#839) 2024-10-23 06:38:07 -05:00
Javier Goday
186e72298f #829: read/write splitting on CTE mutable statements (#835) 2024-10-23 06:20:04 -05:00
Sebastian Serth
3935366d86 End Prometheus stats with a new line separator (#826)
End prometheus stats with a new line separator

According to the [OpenMetrics specification](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#overall-structure), each line MUST end with `\n`. Previously, the last line was not ending with `\n`, so that strict parsers had issues reading the Prometheus stats.
2024-09-22 17:14:04 -05:00
Sean McGivern
b575935b1d Improve documentation for connect_timeout and add min_pool_size (#822)
Currently, `connect_timeout` sounds like it should be for connections to
the Postgres server. It's actually used for obtaining a connection from
the pool.
2024-09-18 06:56:17 -05:00
Shijun Wang
efbab1c333 Helm chart improvements including allowing user password to be pulled from K8s secret (#753)
* Make user min_pool_size configurable

* Set user server_lifetime only if specified

* Increment chart version

* Use default instea of or

* Allow enabling server_tls

* statement_timeout default value

* Allow pulling password from existing secret

---------

Co-authored-by: Mostafa Abdelraouf <mostafa.mohmmed@gmail.com>
2024-09-14 09:57:17 -05:00
Mostafa Abdelraouf
9f12d7958e Fix Ruby tests (#819)
Build is failing with this error

Downloading activerecord-3.2.14 revealed dependencies not in the API or the
lockfile (activesupport (= 3.2.14), activemodel (= 3.2.14), arel (~> 3.0.2),
tzinfo (~> 0.3.29)).
Either installing with `--full-index` or running `bundle update activerecord`
should fix the problem.

After ActiveSupport was updated.

This PR fixes that
2024-09-13 20:02:38 -05:00
dependabot[bot]
e6634ef461 chore(deps): bump activesupport from 7.0.4.1 to 7.0.7.1 in /tests/ruby (#804)
Bumps [activesupport](https://github.com/rails/rails) from 7.0.4.1 to 7.0.7.1.
- [Release notes](https://github.com/rails/rails/releases)
- [Changelog](https://github.com/rails/rails/blob/v7.2.1/activesupport/CHANGELOG.md)
- [Commits](https://github.com/rails/rails/compare/v7.0.4.1...v7.0.7.1)

---
updated-dependencies:
- dependency-name: activesupport
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-13 19:43:26 -05:00
dependabot[bot]
dab2e58647 chore(deps): bump helm/chart-releaser-action from 1.5.0 to 1.6.0 (#812)
Bumps [helm/chart-releaser-action](https://github.com/helm/chart-releaser-action) from 1.5.0 to 1.6.0.
- [Release notes](https://github.com/helm/chart-releaser-action/releases)
- [Commits](be16258da8...a917fd15b2)

---
updated-dependencies:
- dependency-name: helm/chart-releaser-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-13 19:41:25 -05:00
dependabot[bot]
4aaa4378cf chore(deps): bump rexml from 3.2.8 to 3.3.6 in /tests/ruby (#803)
Bumps [rexml](https://github.com/ruby/rexml) from 3.2.8 to 3.3.6.
- [Release notes](https://github.com/ruby/rexml/releases)
- [Changelog](https://github.com/ruby/rexml/blob/master/NEWS.md)
- [Commits](https://github.com/ruby/rexml/compare/v3.2.8...v3.3.6)

---
updated-dependencies:
- dependency-name: rexml
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-13 19:19:30 -05:00
Andrew Jackson
670311daf9 Implement Trust Authentication (#805)
* Implement Trust Authentication

* Remove remaining LDAP stuff

* Reverted LDAP changes, Cleaned up tests

---------

Co-authored-by: Andrew Jackson <andrewjackson2988@gmail.com>
Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
2024-09-10 09:29:45 -05:00
dependabot[bot]
b9ec7f8036 chore(deps): bump actions/setup-python from 4.1.0 to 5.1.0 (#715)
Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4.1.0 to 5.1.0.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/v4.1.0...v5.1.0)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-07 12:21:21 -05:00
dependabot[bot]
d91d23848b chore(deps): bump helm/kind-action from 1.7.0 to 1.10.0 (#732)
Bumps [helm/kind-action](https://github.com/helm/kind-action) from 1.7.0 to 1.10.0.
- [Release notes](https://github.com/helm/kind-action/releases)
- [Commits](https://github.com/helm/kind-action/compare/v1.7.0...v1.10.0)

---
updated-dependencies:
- dependency-name: helm/kind-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-07 12:20:38 -05:00
dependabot[bot]
bbbc01a467 chore(deps): bump rexml from 3.2.5 to 3.2.8 in /tests/ruby (#743)
Bumps [rexml](https://github.com/ruby/rexml) from 3.2.5 to 3.2.8.
- [Release notes](https://github.com/ruby/rexml/releases)
- [Changelog](https://github.com/ruby/rexml/blob/master/NEWS.md)
- [Commits](https://github.com/ruby/rexml/compare/v3.2.5...v3.2.8)

---
updated-dependencies:
- dependency-name: rexml
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-07 12:20:01 -05:00
Sebastian Serth
9bb71ede9d Automatically build deb package on a new version tag (#801)
In #796, I noticed that the deb package was not build since an automation was missing.

With this PR, I add the missing automation.

I tested the workflow in my repo...

    when starting the workflow manually: https://github.com/MrSerth/pgcat/actions/runs/10737879151/job/29780286094
    when drafting a new release: https://github.com/MrSerth/pgcat/actions/runs/10737835796/job/29780146212

Obviously, both workflows failed since I cannot upload to the APT repo. However, the version substitution for the workflow is working correctly (as shown when collapsing the first line of the "Build and release package" step).
2024-09-06 09:11:52 -05:00
Sebastian Serth
88b2afb19b Automatically start systemd service if config file is present (#800)
Previously, upgrading the deb package stopped the service but didn't reenable it after a successful upgrade. This made upgrading the package more difficult and required a second step to restart the service. With this commit, the systemd service is automatically started when the default config file is present.
2024-09-06 09:07:01 -05:00
Mostafa Abdelraouf
f0865ca616 Improve Prometheus exporter output (#795)
* Prometheus metrics updates:

 * Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.

---------

Co-authored-by: Curtis Myzie <curtis.myzie@gmail.com>
Co-authored-by: Towhid Khan
2024-09-05 08:58:18 -05:00
Andrew Jackson
7d047c6c19 Implemented python tests with pytest (#790)
Currently the python tests act as scripts. A lot of output is generated to stdout which makes it very hard to figure out where problems were. Also if you want to run only a single test you basically need to comment out code in order to accomplish this.

This PR modifies the python tests to us the pytest python testing framework. This framework allows individual tests to be targeted via the command line, without touching the source code. It also suppressed stdout by default making the test output much easier to read. Also after the tests run it will provide a summary of what failed, what succeded, etc.


Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
Co-authored-by: Andrew Jackson <andrewjackson2988@gmail.com>
2024-09-05 08:16:45 -05:00
Andrew Jackson
f73d15f82c Fix CI script to allow consecutive runs locally (#793)
Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
2024-09-05 08:01:33 -05:00
Mostafa Abdelraouf
69af6cc5e5 Make iterating on integration tests easier (#789)
Writing and iterating on integration tests are cumbersome, having to wait 10 minutes for the test-suite to run just to see if your test works or not is unacceptable.

In this PR, I added a detailed workflow for writing tests that should shorten the feedback cycle of modifying tests to be as low as a few seconds.

It will involve opening a shell into a long-lived container that has all the setup and dependencies necessary and then running your desired tests directly there. I added a convenience script that bootstraps the environment and then opens an interactive shell into the container and you can then run tests immediately in an environment that is more or less identical to what we have running in CircleCI
2024-09-03 11:15:53 -05:00
27 changed files with 701 additions and 314 deletions

View File

@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy # Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server & LOG_LEVEL=error toxiproxy-server &
sleep 1 sleep 1
@@ -106,7 +107,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests
# #
pip3 install -r tests/python/requirements.txt pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1 pytest || exit 1
# #
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown # Allow for graceful shutdown
sleep 1 sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

View File

@@ -22,7 +22,7 @@ jobs:
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and # Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python # yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v4.1.0 uses: actions/setup-python@v5.1.0
with: with:
python-version: 3.7 python-version: 3.7
@@ -43,7 +43,7 @@ jobs:
run: ct lint --config ct.yaml run: ct lint --config ct.yaml
- name: Create kind cluster - name: Create kind cluster
uses: helm/kind-action@v1.7.0 uses: helm/kind-action@v1.10.0
if: steps.list-changed.outputs.changed == 'true' if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install) - name: Run chart-testing (install)

View File

@@ -32,7 +32,7 @@ jobs:
version: v3.13.0 version: v3.13.0
- name: Run chart-releaser - name: Run chart-releaser
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0 uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
with: with:
charts_dir: charts charts_dir: charts
config: cr.yaml config: cr.yaml

View File

@@ -1,6 +1,9 @@
name: pgcat package (deb) name: pgcat package (deb)
on: on:
push:
tags:
- v*
workflow_dispatch: workflow_dispatch:
inputs: inputs:
packageVersion: packageVersion:
@@ -16,6 +19,14 @@ jobs:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Set package version
if: github.event_name == 'push' # For push event
run: |
TAG=${{ github.ref_name }}
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
- name: Set package version (manual dispatch)
if: github.event_name == 'workflow_dispatch' # For manual dispatch
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
- uses: actions-rs/toolchain@v1 - uses: actions-rs/toolchain@v1
with: with:
toolchain: stable toolchain: stable
@@ -39,10 +50,10 @@ jobs:
export ARCH=arm64 export ARCH=arm64
fi fi
bash utilities/deb.sh ${{ inputs.packageVersion }} bash utilities/deb.sh ${{ env.packageVersion }}
deb-s3 upload \ deb-s3 upload \
--lock \ --lock \
--bucket apt.postgresml.org \ --bucket apt.postgresml.org \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \ pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs) --codename $(lsb_release -cs)

4
.gitignore vendored
View File

@@ -10,4 +10,6 @@ lcov.info
dev/.bash_history dev/.bash_history
dev/cache dev/cache
!dev/cache/.keepme !dev/cache/.keepme
.venv .venv
**/__pycache__
.bundle

View File

@@ -36,10 +36,11 @@ Port at which prometheus exporter listens on.
### connect_timeout ### connect_timeout
``` ```
path: general.connect_timeout path: general.connect_timeout
default: 5000 # milliseconds default: 1000 # milliseconds
``` ```
How long to wait before aborting a server connection (ms). How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
### idle_timeout ### idle_timeout
``` ```
@@ -462,10 +463,18 @@ path: pools.<pool_name>.users.<user_index>.pool_size
default: 9 default: 9
``` ```
Maximum number of server connections that can be established for this user Maximum number of server connections that can be established for this user.
The maximum number of connection from a single Pgcat process to any database in the cluster The maximum number of connection from a single Pgcat process to any database in the cluster
is the sum of pool_size across all users. is the sum of pool_size across all users.
### min_pool_size
```
path: pools.<pool_name>.users.<user_index>.min_pool_size
default: 0
```
Minimum number of idle server connections to retain for this pool.
### statement_timeout ### statement_timeout
``` ```
path: pools.<pool_name>.users.<user_index>.statement_timeout path: pools.<pool_name>.users.<user_index>.statement_timeout
@@ -475,6 +484,16 @@ default: 0
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way. Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
0 means it is disabled. 0 means it is disabled.
### connect_timeout
```
path: pools.<pool_name>.users.<user_index>.connect_timeout
default: <UNSET> # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
If unset, uses the `connect_timeout` defined globally.
## `pools.<pool_name>.shards.<shard_index>` Section ## `pools.<pool_name>.shards.<shard_index>` Section
### servers ### servers
@@ -502,4 +521,3 @@ default: "shard0"
``` ```
Database name (e.g. "postgres") Database name (e.g. "postgres")

View File

@@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here:
3. Performance is important, make sure there are no regressions in your branch vs. `main`. 3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them ## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust. We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround. Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell. Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation` Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app && python3 tests/python/tests.py` Python: `cd /app/ && pytest`
Rust: `cd /app/tests/rust && cargo run` Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test` Go: `cd /app/tests/go && /usr/local/go/bin/go test`

5
Cargo.lock generated
View File

@@ -192,12 +192,11 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]] [[package]]
name = "bb8" name = "bb8"
version = "0.8.1" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2" checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures-channel",
"futures-util", "futures-util",
"parking_lot", "parking_lot",
"tokio", "tokio",

View File

@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
bytes = "1" bytes = "1"
md-5 = "0.10" md-5 = "0.10"
bb8 = "0.8.1" bb8 = "=0.8.6"
async-trait = "0.1" async-trait = "0.1"
rand = "0.8" rand = "0.8"
chrono = "0.4" chrono = "0.4"

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring. description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers: maintainers:
- name: Wildcard - name: PostgresML
email: support@w6d.io email: team@postgresml.org
appVersion: "1.2.0" appVersion: "1.2.0"
version: 0.2.0 version: 0.2.5

View File

@@ -15,6 +15,7 @@ stringData:
connect_timeout = {{ .Values.configuration.general.connect_timeout }} connect_timeout = {{ .Values.configuration.general.connect_timeout }}
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }} idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }} server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
server_tls = {{ .Values.configuration.general.server_tls }}
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }} idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }} healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }} healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
@@ -58,11 +59,21 @@ stringData:
## ##
[pools.{{ $pool.name | quote }}.users.{{ $index }}] [pools.{{ $pool.name | quote }}.users.{{ $index }}]
username = {{ $user.username | quote }} username = {{ $user.username | quote }}
{{- if $user.password }}
password = {{ $user.password | quote }} password = {{ $user.password | quote }}
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
{{- if $secret }}
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
password = {{ $password | quote }}
{{- end }}
{{- end }}
pool_size = {{ $user.pool_size }} pool_size = {{ $user.pool_size }}
statement_timeout = {{ $user.statement_timeout }} statement_timeout = {{ default 0 $user.statement_timeout }}
min_pool_size = 3 min_pool_size = {{ default 3 $user.min_pool_size }}
server_lifetime = 60000 {{- if $user.server_lifetime }}
server_lifetime = {{ $user.server_lifetime }}
{{- end }}
{{- if and $user.server_username $user.server_password }} {{- if and $user.server_username $user.server_password }}
server_username = {{ $user.server_username | quote }} server_username = {{ $user.server_username | quote }}
server_password = {{ $user.server_password | quote }} server_password = {{ $user.server_password | quote }}

View File

@@ -175,6 +175,9 @@ configuration:
# Max connection lifetime before it's closed, even if actively used. # Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours server_lifetime: 86400000 # 24 hours
# Whether to use TLS for server connections or not.
server_tls: false
# How long a client is allowed to be idle while in a transaction (ms). # How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds idle_client_in_transaction_timeout: 0 # milliseconds
@@ -315,7 +318,9 @@ configuration:
# ## Credentials for users that may connect to this cluster # ## Credentials for users that may connect to this cluster
# ## @param users [array] # ## @param users [array]
# ## @param users[0].username Name of the env var (required) # ## @param users[0].username Name of the env var (required)
# ## @param users[0].password Value for the env var (required) # ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
# ## @param users[0].passwordSecret.name Name of the secret containing the password
# ## @param users[0].passwordSecret.key Key in the secret containing the password
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user # ## @param users[0].pool_size Maximum number of server connections that can be established for this user
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way. # ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# users: [] # users: []

View File

@@ -1 +1,2 @@
sign: false sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random # `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors # `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime # `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0" # default_shard = "shard_0"
# So what if you wanted to implement a different hashing function, # So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it? # or you've already built one and you want this pooler to use it?

View File

@@ -7,3 +7,7 @@ systemctl enable pgcat
if ! id pgcat 2> /dev/null; then if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat useradd -s /usr/bin/false pgcat
fi fi
if [ -f /etc/pgcat.toml ]; then
systemctl start pgcat
fi

View File

@@ -1,3 +1,4 @@
use crate::config::AuthType;
use crate::errors::Error; use crate::errors::Error;
use crate::pool::ConnectionPool; use crate::pool::ConnectionPool;
use crate::server::Server; use crate::server::Server;
@@ -71,6 +72,7 @@ impl AuthPassthrough {
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> { pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
let auth_user = crate::config::User { let auth_user = crate::config::User {
username: self.user.clone(), username: self.user.clone(),
auth_type: AuthType::MD5,
password: Some(self.password.clone()), password: Some(self.password.clone()),
server_username: None, server_username: None,
server_password: None, server_password: None,

View File

@@ -14,7 +14,9 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_parameters_for_admin, handle_admin}; use crate::admin::{generate_server_parameters_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash; use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode}; use crate::config::{
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
};
use crate::constants::*; use crate::constants::*;
use crate::messages::*; use crate::messages::*;
use crate::plugins::PluginOutput; use crate::plugins::PluginOutput;
@@ -463,8 +465,8 @@ where
.count() .count()
== 1; == 1;
// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only { if !admin && admin_only {
// Kick any client that's not admin while we're in admin-only mode.
debug!( debug!(
"Rejecting non-admin connection to {} when in admin only mode", "Rejecting non-admin connection to {} when in admin only mode",
pool_name pool_name
@@ -481,72 +483,76 @@ where
let process_id: i32 = rand::random(); let process_id: i32 = rand::random();
let secret_key: i32 = rand::random(); let secret_key: i32 = rand::random();
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let mut prepared_statements_enabled = false; let mut prepared_statements_enabled = false;
// Authenticate admin user. // Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin { let (transaction_mode, mut server_parameters) = if admin {
let config = get_config(); let config = get_config();
// TODO: Add SASL support.
// Perform MD5 authentication.
match config.general.admin_auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
let salt = md5_challenge(&mut write).await?;
// Compare server and client hashes. let code = match read.read_u8().await {
let password_hash = md5_hash_password( Ok(p) => p,
&config.general.admin_username, Err(_) => {
&config.general.admin_password, return Err(Error::ClientSocketError(
&salt, "password code".into(),
); client_identifier,
))
}
};
if password_hash != password_response { // PasswordMessage
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier); if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
warn!("{}", error); let len = match read.read_i32().await {
wrong_password(&mut write, username).await?; Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
return Err(error); let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
if password_hash != password_response {
let error =
Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(error);
}
}
} }
(false, generate_server_parameters_for_admin()) (false, generate_server_parameters_for_admin())
} }
// Authenticate normal user. // Authenticate normal user.
@@ -573,92 +579,143 @@ where
// Obtain the hash to compare, we give preference to that written in cleartext in config // Obtain the hash to compare, we give preference to that written in cleartext in config
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained // if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
// when the pool was created. If there is no hash there, we try to fetch it one more time. // when the pool was created. If there is no hash there, we try to fetch it one more time.
let password_hash = if let Some(password) = &pool.settings.user.password { match pool.settings.user.auth_type {
Some(md5_hash_password(username, password, &salt)) AuthType::Trust => (),
} else { AuthType::MD5 => {
if !get_config().is_auth_query_configured() { // Perform MD5 authentication.
wrong_password(&mut write, username).await?; // TODO: Add SASL support.
return Err(Error::ClientAuthImpossible(username.into())); let salt = md5_challenge(&mut write).await?;
}
let mut hash = (*pool.auth_hash.read()).clone(); let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
if hash.is_none() { // PasswordMessage
warn!( if code as char != 'p' {
"Query auth configured \ return Err(Error::ProtocolSyncError(format!(
but no hash password found \ "Expected p, got {}",
for pool {}. Will try to refetch it.", code as char
pool_name )));
); }
match refetch_auth_hash(&pool).await { let len = match read.read_i32().await {
Ok(fetched_hash) => { Ok(len) => len,
warn!("Password for {}, obtained. Updating.", client_identifier); Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let mut hash = (*pool.auth_hash.read()).clone();
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!(
"Password for {}, obtained. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone());
}
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{ {
let mut pool_auth_hash = pool.auth_hash.write(); let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone()); *pool_auth_hash = Some(fetched_hash);
} }
} else {
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?; wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
return Err(Error::ClientAuthPassthroughError( "Invalid password".into(),
err.to_string(),
client_identifier, client_identifier,
)); ));
} }
} }
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
}
} else {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
client_identifier,
));
} }
} }
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction; let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
prepared_statements_enabled = prepared_statements_enabled =
transaction_mode && pool.prepared_statement_cache.is_some(); transaction_mode && pool.prepared_statement_cache.is_some();
@@ -824,6 +881,7 @@ where
}; };
query_router.update_pool_settings(&pool.settings); query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop. // Our custom protocol loop.
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries

View File

@@ -208,6 +208,9 @@ impl Address {
pub struct User { pub struct User {
pub username: String, pub username: String,
pub password: Option<String>, pub password: Option<String>,
#[serde(default = "User::default_auth_type")]
pub auth_type: AuthType,
pub server_username: Option<String>, pub server_username: Option<String>,
pub server_password: Option<String>, pub server_password: Option<String>,
pub pool_size: u32, pub pool_size: u32,
@@ -225,6 +228,7 @@ impl Default for User {
User { User {
username: String::from("postgres"), username: String::from("postgres"),
password: None, password: None,
auth_type: AuthType::MD5,
server_username: None, server_username: None,
server_password: None, server_password: None,
pool_size: 15, pool_size: 15,
@@ -239,6 +243,10 @@ impl Default for User {
} }
impl User { impl User {
pub fn default_auth_type() -> AuthType {
AuthType::MD5
}
fn validate(&self) -> Result<(), Error> { fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size { if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size { if min_pool_size > self.pool_size {
@@ -334,6 +342,9 @@ pub struct General {
pub admin_username: String, pub admin_username: String,
pub admin_password: String, pub admin_password: String,
#[serde(default = "General::default_admin_auth_type")]
pub admin_auth_type: AuthType,
#[serde(default = "General::default_validate_config")] #[serde(default = "General::default_validate_config")]
pub validate_config: bool, pub validate_config: bool,
@@ -348,6 +359,10 @@ impl General {
"0.0.0.0".into() "0.0.0.0".into()
} }
pub fn default_admin_auth_type() -> AuthType {
AuthType::MD5
}
pub fn default_port() -> u16 { pub fn default_port() -> u16 {
5432 5432
} }
@@ -456,6 +471,7 @@ impl Default for General {
verify_server_certificate: false, verify_server_certificate: false,
admin_username: String::from("admin"), admin_username: String::from("admin"),
admin_password: String::from("admin"), admin_password: String::from("admin"),
admin_auth_type: AuthType::MD5,
validate_config: true, validate_config: true,
auth_query: None, auth_query: None,
auth_query_user: None, auth_query_user: None,
@@ -476,6 +492,15 @@ pub enum PoolMode {
Session, Session,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum AuthType {
#[serde(alias = "trust", alias = "Trust")]
Trust,
#[serde(alias = "md5", alias = "MD5")]
MD5,
}
impl std::fmt::Display for PoolMode { impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {

View File

@@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> { impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self let mut sorted_labels: Vec<_> = self.labels.iter().collect();
.labels sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
.iter() .iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value)) .map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
write!( write!(
f, f,
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n", "{name}{{{formatted_labels}}} {value}",
name = format_args!("pgcat_{}", self.name), name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels, formatted_labels = formatted_labels,
value = self.value value = self.value
) )
@@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels) Self::from_name(&format!("databases_{}", name), value, labels)
} }
@@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
@@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels) Self::from_name(&format!("pools_{}", name), value, labels)
} }
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
} }
async fn prometheus_stats( async fn prometheus_stats(
@@ -300,6 +309,7 @@ async fn prometheus_stats(
push_pool_stats(&mut lines); push_pool_stats(&mut lines);
push_server_stats(&mut lines); push_server_stats(&mut lines);
push_database_stats(&mut lines); push_database_stats(&mut lines);
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
Response::builder() Response::builder()
.header("content-type", "text/plain; version=0.0.4") .header("content-type", "text/plain; version=0.0.4")
@@ -313,6 +323,7 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command. // Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) { fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -322,7 +333,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value) PrometheusMetric::<u64>::from_address(address, &key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -330,33 +344,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW POOLS admin command. // Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) { fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup(); let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() { for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() { for (name, value) in stats.clone() {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value) PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for ({})", name, *pool_id); debug!("Metric {} not implemented for ({})", name, *pool_id);
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW DATABASES admin command. // Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) { fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone(); let pool_config = pool.settings.clone();
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server); let pool_state = pool.pool_state(shard, server);
let metrics = vec![ let metrics = vec![
("pool_size", pool_config.user.pool_size), ("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections), ("current_connections", pool_state.connections),
@@ -365,7 +399,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value) PrometheusMetric::<u32>::from_database_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -373,6 +410,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -405,7 +450,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1, crate::stats::ServerState::Idle => entry.idle_count += 1,
} }
} }
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -428,7 +473,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value) PrometheusMetric::<u64>::from_server_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -437,6 +485,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
pub async fn start_metric_server(http_addr: SocketAddr) { pub async fn start_metric_server(http_addr: SocketAddr) {

View File

@@ -386,6 +386,18 @@ impl QueryRouter {
} }
} }
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
_ => false,
}
}
/// Try to infer which server to connect to based on the contents of the query. /// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> { pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting { if !self.pool_settings.query_parser_read_write_splitting {
@@ -428,8 +440,9 @@ impl QueryRouter {
}; };
let has_locks = !query.locks.is_empty(); let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
if has_locks { if has_locks || has_mutation {
self.active_role = Some(Role::Primary); self.active_role = Some(Role::Primary);
} else if !visited_write_statement { } else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary. // If we already visited a write statement, we should be going to the primary.
@@ -1048,6 +1061,11 @@ impl QueryRouter {
self.active_shard self.active_shard
} }
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to. /// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> { pub fn role(&self) -> Option<Role> {
self.active_role self.active_role
@@ -1113,6 +1131,26 @@ mod test {
assert_eq!(qr.role(), None); assert_eq!(qr.role(), None);
} }
#[test]
fn test_split_cte_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
let query = simple_query(
"WITH t AS (
SELECT id FROM users WHERE name ILIKE '%ja%'
)
UPDATE user_languages
SET settings = '{}'
FROM t WHERE t.id = user_id;",
);
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[test] #[test]
fn test_infer_replica() { fn test_infer_replica() {
QueryRouter::setup(); QueryRouter::setup();

View File

@@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready" echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:" echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}" echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}" echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}" echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}" echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again" echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running" echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}" echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again" echo "and then run the tests again"
echo "===================================" echo "==================================="

View File

@@ -1,2 +1,3 @@
pytest
psycopg2==2.9.3 psycopg2==2.9.3
psutil==5.9.1 psutil==5.9.1

71
tests/python/test_auth.py Normal file
View File

@@ -0,0 +1,71 @@
import utils
import signal
class TestTrustAuth:
@classmethod
def setup_method(cls):
config= """
[general]
host = "0.0.0.0"
port = 6432
admin_username = "admin_user"
admin_password = ""
admin_auth_type = "trust"
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
auth_type = "trust"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.sharded_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
]
database = "shard0"
"""
utils.pgcat_generic_start(config)
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_admin_trust_auth(self):
conn, cur = utils.connect_db_trust(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_normal_trust_auth(self):
conn, cur = utils.connect_db_trust(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
class TestMD5Auth:
@classmethod
def setup_method(cls):
utils.pgcat_start()
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_normal_db_access(self):
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access(self):
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)

View File

@@ -1,84 +1,12 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal import signal
import time import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5 SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_shutdown_logic(): def test_shutdown_logic():
@@ -86,17 +14,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING # NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and send query (not in transaction) # Create client connection and send query (not in transaction)
conn, cur = connect_db() conn, cur = utils.connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
cur.execute("COMMIT;") cur.execute("COMMIT;")
# Send sigint to pgcat # Send sigint to pgcat
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions # Check that any new queries fail after sigint since server should close with no active transactions
@@ -108,18 +36,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint") raise Exception("Server not closed after sigint")
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND # NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = connect_db() conn, cur = utils.connect_db()
admin_conn, admin_cur = connect_db(admin=True) admin_conn, admin_cur = utils.connect_db(admin=True)
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
@@ -138,24 +66,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint") raise Exception("Server not closed after sigint")
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur) utils.cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT # HANDLE TRANSACTION WITH SIGINT
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = connect_db() conn, cur = utils.connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete # Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -165,18 +93,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed # Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror) raise Exception("Server closed while in transaction", e.pgerror)
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND # HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = connect_db() conn, cur = utils.connect_db()
admin_conn, admin_cur = connect_db(admin=True) admin_conn, admin_cur = utils.connect_db(admin=True)
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
@@ -194,30 +122,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed # Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror) raise Exception("Server closed while in transaction", e.pgerror)
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur) utils.cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db() transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
start = time.perf_counter() start = time.perf_counter()
try: try:
conn, cur = connect_db() conn, cur = utils.connect_db()
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start time_taken = time.perf_counter() - start
if time_taken > 0.1: if time_taken > 0.1:
@@ -227,49 +155,49 @@ def test_shutdown_logic():
else: else:
raise Exception("Able connect to database during shutdown") raise Exception("Able connect to database during shutdown")
cleanup_conn(transaction_conn, transaction_cur) utils.cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db() transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
try: try:
conn, cur = connect_db(admin=True) conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW DATABASES;") cur.execute("SHOW DATABASES;")
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
raise Exception(e) raise Exception(e)
cleanup_conn(transaction_conn, transaction_cur) utils.cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db() transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = connect_db(admin=True) admin_conn, admin_cur = utils.connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;") admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
try: try:
@@ -277,24 +205,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e) raise Exception("Could not execute admin command:", e)
cleanup_conn(transaction_conn, transaction_cur) utils.cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur) utils.cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT # HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat # Start pgcat
pgcat_start() utils.pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = connect_db() conn, cur = utils.connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT) utils.pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1) time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -308,12 +236,7 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout") raise Exception("Server not closed after sigint and expected timeout")
cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

110
tests/python/utils.py Normal file
View File

@@ -0,0 +1,110 @@
import os
import signal
import time
from typing import Tuple
import tempfile
import psutil
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def _pgcat_start(config_path: str):
pg_cat_send_signal(signal.SIGTERM)
os.system(f"./target/debug/pgcat {config_path} &")
time.sleep(2)
def pgcat_start():
_pgcat_start(config_path='.circleci/pgcat.toml')
def pgcat_generic_start(config: str):
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'w') as f:
f.write(config)
_pgcat_start(config_path=tmp.name)
def glauth_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.name() == "glauth":
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep glauth'):
raise Exception("glauth not closed after SIGTERM")
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def connect_db_trust(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
db = "pgcat"
else:
user = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()

View File

@@ -1,22 +1,33 @@
GEM GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
activemodel (7.0.4.1) activemodel (7.1.4)
activesupport (= 7.0.4.1) activesupport (= 7.1.4)
activerecord (7.0.4.1) activerecord (7.1.4)
activemodel (= 7.0.4.1) activemodel (= 7.1.4)
activesupport (= 7.0.4.1) activesupport (= 7.1.4)
activesupport (7.0.4.1) timeout (>= 0.4.0)
activesupport (7.1.4)
base64
bigdecimal
concurrent-ruby (~> 1.0, >= 1.0.2) concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2) i18n (>= 1.6, < 2)
minitest (>= 5.1) minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0) tzinfo (~> 2.0)
ast (2.4.2) ast (2.4.2)
concurrent-ruby (1.1.10) base64 (0.2.0)
bigdecimal (3.1.8)
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
diff-lcs (1.5.0) diff-lcs (1.5.0)
i18n (1.12.0) drb (2.2.1)
i18n (1.14.5)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
minitest (5.17.0) minitest (5.25.1)
mutex_m (0.2.0)
parallel (1.22.1) parallel (1.22.1)
parser (3.1.2.0) parser (3.1.2.0)
ast (~> 2.4.1) ast (~> 2.4.1)
@@ -24,7 +35,8 @@ GEM
pg (1.3.2) pg (1.3.2)
rainbow (3.1.1) rainbow (3.1.1)
regexp_parser (2.3.1) regexp_parser (2.3.1)
rexml (3.2.5) rexml (3.3.6)
strscan
rspec (3.11.0) rspec (3.11.0)
rspec-core (~> 3.11.0) rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0) rspec-expectations (~> 3.11.0)
@@ -50,10 +62,12 @@ GEM
rubocop-ast (1.17.0) rubocop-ast (1.17.0)
parser (>= 3.1.1.0) parser (>= 3.1.1.0)
ruby-progressbar (1.11.0) ruby-progressbar (1.11.0)
strscan (3.1.0)
timeout (0.4.1)
toml (0.3.0) toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0) parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1) toxiproxy (2.0.1)
tzinfo (2.0.5) tzinfo (2.0.6)
concurrent-ruby (~> 1.0) concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0) unicode-display_width (2.1.0)

View File

@@ -56,6 +56,41 @@ describe "Random Load Balancing" do
end end
end end
end end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end end
describe "Least Outstanding Queries Load Balancing" do describe "Least Outstanding Queries Load Balancing" do
@@ -161,4 +196,3 @@ describe "Least Outstanding Queries Load Balancing" do
end end
end end
end end