mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
31 Commits
mostafa_ad
...
circleci_O
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16a2cece21 | ||
|
|
0ee59c0c40 | ||
|
|
b61d2cc6f0 | ||
|
|
c11418c083 | ||
|
|
c9544bdff2 | ||
|
|
cdcfa99fb9 | ||
|
|
f27dc6b483 | ||
|
|
326efc22b3 | ||
|
|
01c6afb2e5 | ||
|
|
a68071dd28 | ||
|
|
c27d801abf | ||
|
|
186e72298f | ||
|
|
3935366d86 | ||
|
|
b575935b1d | ||
|
|
efbab1c333 | ||
|
|
9f12d7958e | ||
|
|
e6634ef461 | ||
|
|
dab2e58647 | ||
|
|
4aaa4378cf | ||
|
|
670311daf9 | ||
|
|
b9ec7f8036 | ||
|
|
d91d23848b | ||
|
|
bbbc01a467 | ||
|
|
9bb71ede9d | ||
|
|
88b2afb19b | ||
|
|
f0865ca616 | ||
|
|
7d047c6c19 | ||
|
|
f73d15f82c | ||
|
|
69af6cc5e5 | ||
|
|
ca34597002 | ||
|
|
2def40ea6a |
@@ -59,6 +59,7 @@ admin_password = "admin_pass"
|
|||||||
# session: one server connection per connected client
|
# session: one server connection per connected client
|
||||||
# transaction: one server connection per client transaction
|
# transaction: one server connection per client transaction
|
||||||
pool_mode = "transaction"
|
pool_mode = "transaction"
|
||||||
|
prepared_statements_cache_size = 500
|
||||||
|
|
||||||
# If the client doesn't specify, route traffic to
|
# If the client doesn't specify, route traffic to
|
||||||
# this role by default.
|
# this role by default.
|
||||||
@@ -141,6 +142,7 @@ query_parser_enabled = true
|
|||||||
query_parser_read_write_splitting = true
|
query_parser_read_write_splitting = true
|
||||||
primary_reads_enabled = true
|
primary_reads_enabled = true
|
||||||
sharding_function = "pg_bigint_hash"
|
sharding_function = "pg_bigint_hash"
|
||||||
|
prepared_statements_cache_size = 500
|
||||||
|
|
||||||
[pools.simple_db.users.0]
|
[pools.simple_db.users.0]
|
||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
|||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
|
kill -9 $(pgrep toxiproxy) || true
|
||||||
LOG_LEVEL=error toxiproxy-server &
|
LOG_LEVEL=error toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
@@ -106,7 +107,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py || exit 1
|
pytest || exit 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
|
|||||||
|
|
||||||
# Allow for graceful shutdown
|
# Allow for graceful shutdown
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
|
kill -9 $(pgrep toxiproxy)
|
||||||
|
sleep 1
|
||||||
|
|||||||
4
.github/workflows/chart-lint-test.yaml
vendored
4
.github/workflows/chart-lint-test.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
|||||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4.1.0
|
uses: actions/setup-python@v5.1.0
|
||||||
with:
|
with:
|
||||||
python-version: 3.7
|
python-version: 3.7
|
||||||
|
|
||||||
@@ -43,7 +43,7 @@ jobs:
|
|||||||
run: ct lint --config ct.yaml
|
run: ct lint --config ct.yaml
|
||||||
|
|
||||||
- name: Create kind cluster
|
- name: Create kind cluster
|
||||||
uses: helm/kind-action@v1.7.0
|
uses: helm/kind-action@v1.10.0
|
||||||
if: steps.list-changed.outputs.changed == 'true'
|
if: steps.list-changed.outputs.changed == 'true'
|
||||||
|
|
||||||
- name: Run chart-testing (install)
|
- name: Run chart-testing (install)
|
||||||
|
|||||||
2
.github/workflows/chart-release.yaml
vendored
2
.github/workflows/chart-release.yaml
vendored
@@ -32,7 +32,7 @@ jobs:
|
|||||||
version: v3.13.0
|
version: v3.13.0
|
||||||
|
|
||||||
- name: Run chart-releaser
|
- name: Run chart-releaser
|
||||||
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
|
||||||
with:
|
with:
|
||||||
charts_dir: charts
|
charts_dir: charts
|
||||||
config: cr.yaml
|
config: cr.yaml
|
||||||
|
|||||||
15
.github/workflows/publish-deb-package.yml
vendored
15
.github/workflows/publish-deb-package.yml
vendored
@@ -1,6 +1,9 @@
|
|||||||
name: pgcat package (deb)
|
name: pgcat package (deb)
|
||||||
|
|
||||||
on:
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- v*
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
packageVersion:
|
packageVersion:
|
||||||
@@ -16,6 +19,14 @@ jobs:
|
|||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
- name: Set package version
|
||||||
|
if: github.event_name == 'push' # For push event
|
||||||
|
run: |
|
||||||
|
TAG=${{ github.ref_name }}
|
||||||
|
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
|
||||||
|
- name: Set package version (manual dispatch)
|
||||||
|
if: github.event_name == 'workflow_dispatch' # For manual dispatch
|
||||||
|
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
toolchain: stable
|
toolchain: stable
|
||||||
@@ -39,10 +50,10 @@ jobs:
|
|||||||
export ARCH=arm64
|
export ARCH=arm64
|
||||||
fi
|
fi
|
||||||
|
|
||||||
bash utilities/deb.sh ${{ inputs.packageVersion }}
|
bash utilities/deb.sh ${{ env.packageVersion }}
|
||||||
|
|
||||||
deb-s3 upload \
|
deb-s3 upload \
|
||||||
--lock \
|
--lock \
|
||||||
--bucket apt.postgresml.org \
|
--bucket apt.postgresml.org \
|
||||||
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
||||||
--codename $(lsb_release -cs)
|
--codename $(lsb_release -cs)
|
||||||
|
|||||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -10,4 +10,6 @@ lcov.info
|
|||||||
dev/.bash_history
|
dev/.bash_history
|
||||||
dev/cache
|
dev/cache
|
||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
|
**/__pycache__
|
||||||
|
.bundle
|
||||||
35
CONFIG.md
35
CONFIG.md
@@ -36,10 +36,11 @@ Port at which prometheus exporter listens on.
|
|||||||
### connect_timeout
|
### connect_timeout
|
||||||
```
|
```
|
||||||
path: general.connect_timeout
|
path: general.connect_timeout
|
||||||
default: 5000 # milliseconds
|
default: 1000 # milliseconds
|
||||||
```
|
```
|
||||||
|
|
||||||
How long to wait before aborting a server connection (ms).
|
How long the client waits to obtain a server connection before aborting (ms).
|
||||||
|
This is similar to PgBouncer's `query_wait_timeout`.
|
||||||
|
|
||||||
### idle_timeout
|
### idle_timeout
|
||||||
```
|
```
|
||||||
@@ -308,6 +309,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
|
|||||||
`replica` round-robin between replicas only without touching the primary,
|
`replica` round-robin between replicas only without touching the primary,
|
||||||
`primary` all queries go to the primary unless otherwise specified.
|
`primary` all queries go to the primary unless otherwise specified.
|
||||||
|
|
||||||
|
### replica_to_primary_failover_enabled
|
||||||
|
```
|
||||||
|
path: pools.<pool_name>.replica_to_primary_failover_enabled
|
||||||
|
default: "false"
|
||||||
|
```
|
||||||
|
|
||||||
|
If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
|
||||||
|
and all replicas are banned, queries will be sent to the primary (until a replica is back online).
|
||||||
|
|
||||||
### prepared_statements_cache_size
|
### prepared_statements_cache_size
|
||||||
```
|
```
|
||||||
path: general.prepared_statements_cache_size
|
path: general.prepared_statements_cache_size
|
||||||
@@ -462,10 +472,18 @@ path: pools.<pool_name>.users.<user_index>.pool_size
|
|||||||
default: 9
|
default: 9
|
||||||
```
|
```
|
||||||
|
|
||||||
Maximum number of server connections that can be established for this user
|
Maximum number of server connections that can be established for this user.
|
||||||
The maximum number of connection from a single Pgcat process to any database in the cluster
|
The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||||
is the sum of pool_size across all users.
|
is the sum of pool_size across all users.
|
||||||
|
|
||||||
|
### min_pool_size
|
||||||
|
```
|
||||||
|
path: pools.<pool_name>.users.<user_index>.min_pool_size
|
||||||
|
default: 0
|
||||||
|
```
|
||||||
|
|
||||||
|
Minimum number of idle server connections to retain for this pool.
|
||||||
|
|
||||||
### statement_timeout
|
### statement_timeout
|
||||||
```
|
```
|
||||||
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
||||||
@@ -475,6 +493,16 @@ default: 0
|
|||||||
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
0 means it is disabled.
|
0 means it is disabled.
|
||||||
|
|
||||||
|
### connect_timeout
|
||||||
|
```
|
||||||
|
path: pools.<pool_name>.users.<user_index>.connect_timeout
|
||||||
|
default: <UNSET> # milliseconds
|
||||||
|
```
|
||||||
|
|
||||||
|
How long the client waits to obtain a server connection before aborting (ms).
|
||||||
|
This is similar to PgBouncer's `query_wait_timeout`.
|
||||||
|
If unset, uses the `connect_timeout` defined globally.
|
||||||
|
|
||||||
## `pools.<pool_name>.shards.<shard_index>` Section
|
## `pools.<pool_name>.shards.<shard_index>` Section
|
||||||
|
|
||||||
### servers
|
### servers
|
||||||
@@ -502,4 +530,3 @@ default: "shard0"
|
|||||||
```
|
```
|
||||||
|
|
||||||
Database name (e.g. "postgres")
|
Database name (e.g. "postgres")
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,32 @@ Thank you for contributing! Just a few tips here:
|
|||||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||||
|
|
||||||
|
## How to run the integration tests locally and iterate on them
|
||||||
|
We have integration tests written in Ruby, Python, Go and Rust.
|
||||||
|
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||||
|
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||||
|
|
||||||
|
|
||||||
|
Quite simply, make sure you have docker installed and then run
|
||||||
|
`./start_test_env.sh`
|
||||||
|
|
||||||
|
That is it!
|
||||||
|
|
||||||
|
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
||||||
|
|
||||||
|
Once the environment is ready, you can run the tests by running
|
||||||
|
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||||
|
Python: `cd /app/ && pytest`
|
||||||
|
Rust: `cd /app/tests/rust && cargo run`
|
||||||
|
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||||
|
|
||||||
|
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
||||||
|
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Happy hacking!
|
Happy hacking!
|
||||||
|
|
||||||
## TODOs
|
## TODOs
|
||||||
|
|||||||
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -192,12 +192,11 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bb8"
|
name = "bb8"
|
||||||
version = "0.8.1"
|
version = "0.8.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
|
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"futures-channel",
|
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ edition = "2021"
|
|||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
md-5 = "0.10"
|
md-5 = "0.10"
|
||||||
bb8 = "0.8.1"
|
bb8 = "=0.8.6"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
|
|||||||
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
||||||
|
|
||||||
### Failover
|
### Failover
|
||||||
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||||
|
|
||||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||||
|
|
||||||
|
|||||||
@@ -5,4 +5,4 @@ maintainers:
|
|||||||
- name: Wildcard
|
- name: Wildcard
|
||||||
email: support@w6d.io
|
email: support@w6d.io
|
||||||
appVersion: "1.2.0"
|
appVersion: "1.2.0"
|
||||||
version: 0.2.0
|
version: 0.2.4
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ stringData:
|
|||||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||||
|
server_tls = {{ .Values.configuration.general.server_tls }}
|
||||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||||
@@ -58,11 +59,21 @@ stringData:
|
|||||||
##
|
##
|
||||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||||
username = {{ $user.username | quote }}
|
username = {{ $user.username | quote }}
|
||||||
|
{{- if $user.password }}
|
||||||
password = {{ $user.password | quote }}
|
password = {{ $user.password | quote }}
|
||||||
|
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
|
||||||
|
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
|
||||||
|
{{- if $secret }}
|
||||||
|
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
|
||||||
|
password = {{ $password | quote }}
|
||||||
|
{{- end }}
|
||||||
|
{{- end }}
|
||||||
pool_size = {{ $user.pool_size }}
|
pool_size = {{ $user.pool_size }}
|
||||||
statement_timeout = {{ $user.statement_timeout }}
|
statement_timeout = {{ default 0 $user.statement_timeout }}
|
||||||
min_pool_size = 3
|
min_pool_size = {{ default 3 $user.min_pool_size }}
|
||||||
server_lifetime = 60000
|
{{- if $user.server_lifetime }}
|
||||||
|
server_lifetime = {{ $user.server_lifetime }}
|
||||||
|
{{- end }}
|
||||||
{{- if and $user.server_username $user.server_password }}
|
{{- if and $user.server_username $user.server_password }}
|
||||||
server_username = {{ $user.server_username | quote }}
|
server_username = {{ $user.server_username | quote }}
|
||||||
server_password = {{ $user.server_password | quote }}
|
server_password = {{ $user.server_password | quote }}
|
||||||
|
|||||||
@@ -175,6 +175,9 @@ configuration:
|
|||||||
# Max connection lifetime before it's closed, even if actively used.
|
# Max connection lifetime before it's closed, even if actively used.
|
||||||
server_lifetime: 86400000 # 24 hours
|
server_lifetime: 86400000 # 24 hours
|
||||||
|
|
||||||
|
# Whether to use TLS for server connections or not.
|
||||||
|
server_tls: false
|
||||||
|
|
||||||
# How long a client is allowed to be idle while in a transaction (ms).
|
# How long a client is allowed to be idle while in a transaction (ms).
|
||||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||||
|
|
||||||
@@ -315,7 +318,9 @@ configuration:
|
|||||||
# ## Credentials for users that may connect to this cluster
|
# ## Credentials for users that may connect to this cluster
|
||||||
# ## @param users [array]
|
# ## @param users [array]
|
||||||
# ## @param users[0].username Name of the env var (required)
|
# ## @param users[0].username Name of the env var (required)
|
||||||
# ## @param users[0].password Value for the env var (required)
|
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
|
||||||
|
# ## @param users[0].passwordSecret.name Name of the secret containing the password
|
||||||
|
# ## @param users[0].passwordSecret.key Key in the secret containing the password
|
||||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
# users: []
|
# users: []
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ primary_reads_enabled = true
|
|||||||
# `random`: picks a shard at random
|
# `random`: picks a shard at random
|
||||||
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
||||||
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
||||||
# no_shard_specified_behavior = "shard_0"
|
# default_shard = "shard_0"
|
||||||
|
|
||||||
# So what if you wanted to implement a different hashing function,
|
# So what if you wanted to implement a different hashing function,
|
||||||
# or you've already built one and you want this pooler to use it?
|
# or you've already built one and you want this pooler to use it?
|
||||||
|
|||||||
4
postinst
4
postinst
@@ -7,3 +7,7 @@ systemctl enable pgcat
|
|||||||
if ! id pgcat 2> /dev/null; then
|
if ! id pgcat 2> /dev/null; then
|
||||||
useradd -s /usr/bin/false pgcat
|
useradd -s /usr/bin/false pgcat
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [ -f /etc/pgcat.toml ]; then
|
||||||
|
systemctl start pgcat
|
||||||
|
fi
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::config::AuthType;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::ConnectionPool;
|
use crate::pool::ConnectionPool;
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -71,6 +72,7 @@ impl AuthPassthrough {
|
|||||||
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
||||||
let auth_user = crate::config::User {
|
let auth_user = crate::config::User {
|
||||||
username: self.user.clone(),
|
username: self.user.clone(),
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
password: Some(self.password.clone()),
|
password: Some(self.password.clone()),
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
|
|||||||
318
src/client.rs
318
src/client.rs
@@ -14,7 +14,9 @@ use tokio::sync::mpsc::Sender;
|
|||||||
|
|
||||||
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
||||||
use crate::auth_passthrough::refetch_auth_hash;
|
use crate::auth_passthrough::refetch_auth_hash;
|
||||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
use crate::config::{
|
||||||
|
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
|
||||||
|
};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
use crate::plugins::PluginOutput;
|
||||||
@@ -463,8 +465,8 @@ where
|
|||||||
.count()
|
.count()
|
||||||
== 1;
|
== 1;
|
||||||
|
|
||||||
// Kick any client that's not admin while we're in admin-only mode.
|
|
||||||
if !admin && admin_only {
|
if !admin && admin_only {
|
||||||
|
// Kick any client that's not admin while we're in admin-only mode.
|
||||||
debug!(
|
debug!(
|
||||||
"Rejecting non-admin connection to {} when in admin only mode",
|
"Rejecting non-admin connection to {} when in admin only mode",
|
||||||
pool_name
|
pool_name
|
||||||
@@ -481,72 +483,76 @@ where
|
|||||||
let process_id: i32 = rand::random();
|
let process_id: i32 = rand::random();
|
||||||
let secret_key: i32 = rand::random();
|
let secret_key: i32 = rand::random();
|
||||||
|
|
||||||
// Perform MD5 authentication.
|
|
||||||
// TODO: Add SASL support.
|
|
||||||
let salt = md5_challenge(&mut write).await?;
|
|
||||||
|
|
||||||
let code = match read.read_u8().await {
|
|
||||||
Ok(p) => p,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password code".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// PasswordMessage
|
|
||||||
if code as char != 'p' {
|
|
||||||
return Err(Error::ProtocolSyncError(format!(
|
|
||||||
"Expected p, got {}",
|
|
||||||
code as char
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let len = match read.read_i32().await {
|
|
||||||
Ok(len) => len,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message length".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
|
||||||
|
|
||||||
match read.read_exact(&mut password_response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut prepared_statements_enabled = false;
|
let mut prepared_statements_enabled = false;
|
||||||
|
|
||||||
// Authenticate admin user.
|
// Authenticate admin user.
|
||||||
let (transaction_mode, mut server_parameters) = if admin {
|
let (transaction_mode, mut server_parameters) = if admin {
|
||||||
let config = get_config();
|
let config = get_config();
|
||||||
|
// TODO: Add SASL support.
|
||||||
|
// Perform MD5 authentication.
|
||||||
|
match config.general.admin_auth_type {
|
||||||
|
AuthType::Trust => (),
|
||||||
|
AuthType::MD5 => {
|
||||||
|
let salt = md5_challenge(&mut write).await?;
|
||||||
|
|
||||||
// Compare server and client hashes.
|
let code = match read.read_u8().await {
|
||||||
let password_hash = md5_hash_password(
|
Ok(p) => p,
|
||||||
&config.general.admin_username,
|
Err(_) => {
|
||||||
&config.general.admin_password,
|
return Err(Error::ClientSocketError(
|
||||||
&salt,
|
"password code".into(),
|
||||||
);
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if password_hash != password_response {
|
// PasswordMessage
|
||||||
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
if code as char != 'p' {
|
||||||
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
|
"Expected p, got {}",
|
||||||
|
code as char
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
warn!("{}", error);
|
let len = match read.read_i32().await {
|
||||||
wrong_password(&mut write, username).await?;
|
Ok(len) => len,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return Err(error);
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Compare server and client hashes.
|
||||||
|
let password_hash = md5_hash_password(
|
||||||
|
&config.general.admin_username,
|
||||||
|
&config.general.admin_password,
|
||||||
|
&salt,
|
||||||
|
);
|
||||||
|
|
||||||
|
if password_hash != password_response {
|
||||||
|
let error =
|
||||||
|
Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||||
|
|
||||||
|
warn!("{}", error);
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(false, generate_server_parameters_for_admin())
|
(false, generate_server_parameters_for_admin())
|
||||||
}
|
}
|
||||||
// Authenticate normal user.
|
// Authenticate normal user.
|
||||||
@@ -573,92 +579,143 @@ where
|
|||||||
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
||||||
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
||||||
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
||||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
match pool.settings.user.auth_type {
|
||||||
Some(md5_hash_password(username, password, &salt))
|
AuthType::Trust => (),
|
||||||
} else {
|
AuthType::MD5 => {
|
||||||
if !get_config().is_auth_query_configured() {
|
// Perform MD5 authentication.
|
||||||
wrong_password(&mut write, username).await?;
|
// TODO: Add SASL support.
|
||||||
return Err(Error::ClientAuthImpossible(username.into()));
|
let salt = md5_challenge(&mut write).await?;
|
||||||
}
|
|
||||||
|
|
||||||
let mut hash = (*pool.auth_hash.read()).clone();
|
let code = match read.read_u8().await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password code".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if hash.is_none() {
|
// PasswordMessage
|
||||||
warn!(
|
if code as char != 'p' {
|
||||||
"Query auth configured \
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
but no hash password found \
|
"Expected p, got {}",
|
||||||
for pool {}. Will try to refetch it.",
|
code as char
|
||||||
pool_name
|
)));
|
||||||
);
|
}
|
||||||
|
|
||||||
match refetch_auth_hash(&pool).await {
|
let len = match read.read_i32().await {
|
||||||
Ok(fetched_hash) => {
|
Ok(len) => len,
|
||||||
warn!("Password for {}, obtained. Updating.", client_identifier);
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||||
|
Some(md5_hash_password(username, password, &salt))
|
||||||
|
} else {
|
||||||
|
if !get_config().is_auth_query_configured() {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
return Err(Error::ClientAuthImpossible(username.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut hash = (*pool.auth_hash.read()).clone();
|
||||||
|
|
||||||
|
if hash.is_none() {
|
||||||
|
warn!(
|
||||||
|
"Query auth configured \
|
||||||
|
but no hash password found \
|
||||||
|
for pool {}. Will try to refetch it.",
|
||||||
|
pool_name
|
||||||
|
);
|
||||||
|
|
||||||
|
match refetch_auth_hash(&pool).await {
|
||||||
|
Ok(fetched_hash) => {
|
||||||
|
warn!(
|
||||||
|
"Password for {}, obtained. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
|
*pool_auth_hash = Some(fetched_hash.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
hash = Some(fetched_hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(Error::ClientAuthPassthroughError(
|
||||||
|
err.to_string(),
|
||||||
|
client_identifier,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
||||||
|
};
|
||||||
|
|
||||||
|
// Once we have the resulting hash, we compare with what the client gave us.
|
||||||
|
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
||||||
|
// to see if the password has changed since the pool was created.
|
||||||
|
//
|
||||||
|
// @TODO: we could end up fetching again the same password twice (see above).
|
||||||
|
if password_hash.unwrap() != password_response {
|
||||||
|
warn!(
|
||||||
|
"Invalid password {}, will try to refetch it.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
let fetched_hash = match refetch_auth_hash(&pool).await {
|
||||||
|
Ok(fetched_hash) => fetched_hash,
|
||||||
|
Err(err) => {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||||
|
|
||||||
|
// Ok password changed in server an auth is possible.
|
||||||
|
if new_password_hash == password_response {
|
||||||
|
warn!(
|
||||||
|
"Password for {}, changed in server. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
*pool_auth_hash = Some(fetched_hash.clone());
|
*pool_auth_hash = Some(fetched_hash);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
hash = Some(fetched_hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
wrong_password(&mut write, username).await?;
|
||||||
|
return Err(Error::ClientGeneralError(
|
||||||
return Err(Error::ClientAuthPassthroughError(
|
"Invalid password".into(),
|
||||||
err.to_string(),
|
|
||||||
client_identifier,
|
client_identifier,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
|
||||||
};
|
|
||||||
|
|
||||||
// Once we have the resulting hash, we compare with what the client gave us.
|
|
||||||
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
|
||||||
// to see if the password has changed since the pool was created.
|
|
||||||
//
|
|
||||||
// @TODO: we could end up fetching again the same password twice (see above).
|
|
||||||
if password_hash.unwrap() != password_response {
|
|
||||||
warn!(
|
|
||||||
"Invalid password {}, will try to refetch it.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
|
||||||
Ok(fetched_hash) => fetched_hash,
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
|
||||||
|
|
||||||
// Ok password changed in server an auth is possible.
|
|
||||||
if new_password_hash == password_response {
|
|
||||||
warn!(
|
|
||||||
"Password for {}, changed in server. Updating.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
|
||||||
*pool_auth_hash = Some(fetched_hash);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
return Err(Error::ClientGeneralError(
|
|
||||||
"Invalid password".into(),
|
|
||||||
client_identifier,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
||||||
prepared_statements_enabled =
|
prepared_statements_enabled =
|
||||||
transaction_mode && pool.prepared_statement_cache.is_some();
|
transaction_mode && pool.prepared_statement_cache.is_some();
|
||||||
@@ -824,6 +881,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
query_router.update_pool_settings(&pool.settings);
|
query_router.update_pool_settings(&pool.settings);
|
||||||
|
query_router.set_default_role();
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
|
|||||||
@@ -208,6 +208,9 @@ impl Address {
|
|||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: Option<String>,
|
pub password: Option<String>,
|
||||||
|
|
||||||
|
#[serde(default = "User::default_auth_type")]
|
||||||
|
pub auth_type: AuthType,
|
||||||
pub server_username: Option<String>,
|
pub server_username: Option<String>,
|
||||||
pub server_password: Option<String>,
|
pub server_password: Option<String>,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
@@ -225,6 +228,7 @@ impl Default for User {
|
|||||||
User {
|
User {
|
||||||
username: String::from("postgres"),
|
username: String::from("postgres"),
|
||||||
password: None,
|
password: None,
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
@@ -239,6 +243,10 @@ impl Default for User {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
|
pub fn default_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
fn validate(&self) -> Result<(), Error> {
|
fn validate(&self) -> Result<(), Error> {
|
||||||
if let Some(min_pool_size) = self.min_pool_size {
|
if let Some(min_pool_size) = self.min_pool_size {
|
||||||
if min_pool_size > self.pool_size {
|
if min_pool_size > self.pool_size {
|
||||||
@@ -334,6 +342,9 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_admin_auth_type")]
|
||||||
|
pub admin_auth_type: AuthType,
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
#[serde(default = "General::default_validate_config")]
|
||||||
pub validate_config: bool,
|
pub validate_config: bool,
|
||||||
|
|
||||||
@@ -348,6 +359,10 @@ impl General {
|
|||||||
"0.0.0.0".into()
|
"0.0.0.0".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_admin_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
pub fn default_port() -> u16 {
|
pub fn default_port() -> u16 {
|
||||||
5432
|
5432
|
||||||
}
|
}
|
||||||
@@ -456,6 +471,7 @@ impl Default for General {
|
|||||||
verify_server_certificate: false,
|
verify_server_certificate: false,
|
||||||
admin_username: String::from("admin"),
|
admin_username: String::from("admin"),
|
||||||
admin_password: String::from("admin"),
|
admin_password: String::from("admin"),
|
||||||
|
admin_auth_type: AuthType::MD5,
|
||||||
validate_config: true,
|
validate_config: true,
|
||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
@@ -476,6 +492,15 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
||||||
|
pub enum AuthType {
|
||||||
|
#[serde(alias = "trust", alias = "Trust")]
|
||||||
|
Trust,
|
||||||
|
|
||||||
|
#[serde(alias = "md5", alias = "MD5")]
|
||||||
|
MD5,
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
impl std::fmt::Display for PoolMode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
@@ -516,6 +541,9 @@ pub struct Pool {
|
|||||||
#[serde(default = "Pool::default_default_role")]
|
#[serde(default = "Pool::default_default_role")]
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
|
#[serde(default)] // False
|
||||||
|
pub replica_to_primary_failover_enabled: bool,
|
||||||
|
|
||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
pub query_parser_enabled: bool,
|
pub query_parser_enabled: bool,
|
||||||
|
|
||||||
@@ -709,6 +737,7 @@ impl Default for Pool {
|
|||||||
pool_mode: Self::default_pool_mode(),
|
pool_mode: Self::default_pool_mode(),
|
||||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||||
default_role: String::from("any"),
|
default_role: String::from("any"),
|
||||||
|
replica_to_primary_failover_enabled: false,
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: false,
|
query_parser_read_write_splitting: false,
|
||||||
|
|||||||
51
src/pool.rs
51
src/pool.rs
@@ -162,6 +162,9 @@ pub struct PoolSettings {
|
|||||||
// Default server role to connect to.
|
// Default server role to connect to.
|
||||||
pub default_role: Option<Role>,
|
pub default_role: Option<Role>,
|
||||||
|
|
||||||
|
// Whether or not we should use primary when replicas are unavailable
|
||||||
|
pub replica_to_primary_failover_enabled: bool,
|
||||||
|
|
||||||
// Enable/disable query parser.
|
// Enable/disable query parser.
|
||||||
pub query_parser_enabled: bool,
|
pub query_parser_enabled: bool,
|
||||||
|
|
||||||
@@ -219,6 +222,7 @@ impl Default for PoolSettings {
|
|||||||
user: User::default(),
|
user: User::default(),
|
||||||
db: String::default(),
|
db: String::default(),
|
||||||
default_role: None,
|
default_role: None,
|
||||||
|
replica_to_primary_failover_enabled: false,
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: false,
|
query_parser_read_write_splitting: false,
|
||||||
@@ -531,6 +535,8 @@ impl ConnectionPool {
|
|||||||
"primary" => Some(Role::Primary),
|
"primary" => Some(Role::Primary),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
},
|
||||||
|
replica_to_primary_failover_enabled: pool_config
|
||||||
|
.replica_to_primary_failover_enabled,
|
||||||
query_parser_enabled: pool_config.query_parser_enabled,
|
query_parser_enabled: pool_config.query_parser_enabled,
|
||||||
query_parser_max_length: pool_config.query_parser_max_length,
|
query_parser_max_length: pool_config.query_parser_max_length,
|
||||||
query_parser_read_write_splitting: pool_config
|
query_parser_read_write_splitting: pool_config
|
||||||
@@ -731,6 +737,19 @@ impl ConnectionPool {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
|
||||||
|
// we add primary address at the end of the list of candidates, this way it will be tried when
|
||||||
|
// replicas are all unavailable.
|
||||||
|
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
|
||||||
|
let mut primaries = self
|
||||||
|
.addresses
|
||||||
|
.iter()
|
||||||
|
.flatten()
|
||||||
|
.filter(|address| address.role == Role::Primary)
|
||||||
|
.collect::<Vec<&Address>>();
|
||||||
|
candidates.insert(0, primaries.pop().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
// Indicate we're waiting on a server connection from a pool.
|
// Indicate we're waiting on a server connection from a pool.
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
client_stats.waiting();
|
client_stats.waiting();
|
||||||
@@ -935,24 +954,28 @@ impl ConnectionPool {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if all replicas are banned, in that case unban all of them
|
// If we have replica to primary failover we should not unban replicas
|
||||||
let replicas_available = self.addresses[address.shard]
|
// as we still have the primary to server traffic.
|
||||||
.iter()
|
if !self.settings.replica_to_primary_failover_enabled {
|
||||||
.filter(|addr| addr.role == Role::Replica)
|
// Check if all replicas are banned, in that case unban all of them
|
||||||
.count();
|
let replicas_available = self.addresses[address.shard]
|
||||||
|
.iter()
|
||||||
|
.filter(|addr| addr.role == Role::Replica)
|
||||||
|
.count();
|
||||||
|
|
||||||
debug!("Available targets: {}", replicas_available);
|
debug!("Available targets: {}", replicas_available);
|
||||||
|
|
||||||
let read_guard = self.banlist.read();
|
let read_guard = self.banlist.read();
|
||||||
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
||||||
drop(read_guard);
|
drop(read_guard);
|
||||||
|
|
||||||
if all_replicas_banned {
|
if all_replicas_banned {
|
||||||
let mut write_guard = self.banlist.write();
|
let mut write_guard = self.banlist.write();
|
||||||
warn!("Unbanning all replicas.");
|
warn!("Unbanning all replicas.");
|
||||||
write_guard[address.shard].clear();
|
write_guard[address.shard].clear();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if ban time is expired
|
// Check if ban time is expired
|
||||||
|
|||||||
@@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
|
|||||||
|
|
||||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
let formatted_labels = self
|
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
||||||
.labels
|
sorted_labels.sort_by_key(|&(key, _)| key);
|
||||||
|
let formatted_labels = sorted_labels
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(",");
|
.join(",");
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
"{name}{{{formatted_labels}}} {value}",
|
||||||
name = format_args!("pgcat_{}", self.name),
|
name = format_args!("pgcat_{}", self.name),
|
||||||
help = self.help,
|
|
||||||
ty = self.ty,
|
|
||||||
formatted_labels = formatted_labels,
|
formatted_labels = formatted_labels,
|
||||||
value = self.value
|
value = self.value
|
||||||
)
|
)
|
||||||
@@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("user", address.username.clone());
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
|
|
||||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_header(&self) -> String {
|
||||||
|
format!(
|
||||||
|
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
||||||
|
name = format_args!("pgcat_{}", self.name),
|
||||||
|
help = self.help,
|
||||||
|
ty = self.ty,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prometheus_stats(
|
async fn prometheus_stats(
|
||||||
@@ -300,6 +309,7 @@ async fn prometheus_stats(
|
|||||||
push_pool_stats(&mut lines);
|
push_pool_stats(&mut lines);
|
||||||
push_server_stats(&mut lines);
|
push_server_stats(&mut lines);
|
||||||
push_database_stats(&mut lines);
|
push_database_stats(&mut lines);
|
||||||
|
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.header("content-type", "text/plain; version=0.0.4")
|
.header("content-type", "text/plain; version=0.0.4")
|
||||||
@@ -313,6 +323,7 @@ async fn prometheus_stats(
|
|||||||
|
|
||||||
// Adds metrics shown in a SHOW STATS admin command.
|
// Adds metrics shown in a SHOW STATS admin command.
|
||||||
fn push_address_stats(lines: &mut Vec<String>) {
|
fn push_address_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
@@ -322,7 +333,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key)
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -330,33 +344,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
let pool_stats = PoolStats::construct_pool_lookup();
|
let pool_stats = PoolStats::construct_pool_lookup();
|
||||||
for (pool_id, stats) in pool_stats.iter() {
|
for (pool_id, stats) in pool_stats.iter() {
|
||||||
for (name, value) in stats.clone() {
|
for (name, value) in stats.clone() {
|
||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(name)
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||||
fn push_database_stats(lines: &mut Vec<String>) {
|
fn push_database_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
let pool_config = pool.settings.clone();
|
let pool_config = pool.settings.clone();
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
let address = pool.address(shard, server);
|
let address = pool.address(shard, server);
|
||||||
let pool_state = pool.pool_state(shard, server);
|
let pool_state = pool.pool_state(shard, server);
|
||||||
|
|
||||||
let metrics = vec![
|
let metrics = vec![
|
||||||
("pool_size", pool_config.user.pool_size),
|
("pool_size", pool_config.user.pool_size),
|
||||||
("current_connections", pool_state.connections),
|
("current_connections", pool_state.connections),
|
||||||
@@ -365,7 +399,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key.to_string())
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -373,6 +410,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||||
@@ -405,7 +450,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
@@ -428,7 +473,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key.to_string())
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -437,6 +485,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||||
|
|||||||
@@ -386,6 +386,18 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determines if a query is a mutation or not.
|
||||||
|
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
|
||||||
|
use sqlparser::ast::*;
|
||||||
|
|
||||||
|
match q.body.as_ref() {
|
||||||
|
SetExpr::Insert(_) => true,
|
||||||
|
SetExpr::Update(_) => true,
|
||||||
|
SetExpr::Query(q) => Self::is_mutation_query(q),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Try to infer which server to connect to based on the contents of the query.
|
/// Try to infer which server to connect to based on the contents of the query.
|
||||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||||
if !self.pool_settings.query_parser_read_write_splitting {
|
if !self.pool_settings.query_parser_read_write_splitting {
|
||||||
@@ -428,8 +440,9 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let has_locks = !query.locks.is_empty();
|
let has_locks = !query.locks.is_empty();
|
||||||
|
let has_mutation = Self::is_mutation_query(query);
|
||||||
|
|
||||||
if has_locks {
|
if has_locks || has_mutation {
|
||||||
self.active_role = Some(Role::Primary);
|
self.active_role = Some(Role::Primary);
|
||||||
} else if !visited_write_statement {
|
} else if !visited_write_statement {
|
||||||
// If we already visited a write statement, we should be going to the primary.
|
// If we already visited a write statement, we should be going to the primary.
|
||||||
@@ -1048,6 +1061,11 @@ impl QueryRouter {
|
|||||||
self.active_shard
|
self.active_shard
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set active_role as the default_role specified in the pool.
|
||||||
|
pub fn set_default_role(&mut self) {
|
||||||
|
self.active_role = self.pool_settings.default_role;
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the current desired server role we should be talking to.
|
/// Get the current desired server role we should be talking to.
|
||||||
pub fn role(&self) -> Option<Role> {
|
pub fn role(&self) -> Option<Role> {
|
||||||
self.active_role
|
self.active_role
|
||||||
@@ -1113,6 +1131,26 @@ mod test {
|
|||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_split_cte_queries() {
|
||||||
|
QueryRouter::setup();
|
||||||
|
let mut qr = QueryRouter::new();
|
||||||
|
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||||
|
qr.pool_settings.query_parser_enabled = true;
|
||||||
|
|
||||||
|
let query = simple_query(
|
||||||
|
"WITH t AS (
|
||||||
|
SELECT id FROM users WHERE name ILIKE '%ja%'
|
||||||
|
)
|
||||||
|
UPDATE user_languages
|
||||||
|
SET settings = '{}'
|
||||||
|
FROM t WHERE t.id = user_id;",
|
||||||
|
);
|
||||||
|
let ast = qr.parse(&query).unwrap();
|
||||||
|
assert!(qr.infer(&ast).is_ok());
|
||||||
|
assert_eq!(qr.role(), Some(Role::Primary));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_infer_replica() {
|
fn test_infer_replica() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
@@ -1399,6 +1437,19 @@ mod test {
|
|||||||
assert!(!qr.query_parser_enabled());
|
assert!(!qr.query_parser_enabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_query_parser() {
|
||||||
|
QueryRouter::setup();
|
||||||
|
let mut qr = QueryRouter::new();
|
||||||
|
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||||
|
|
||||||
|
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
|
||||||
|
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||||
|
|
||||||
|
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
|
||||||
|
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_update_from_pool_settings() {
|
fn test_update_from_pool_settings() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
@@ -1408,6 +1459,7 @@ mod test {
|
|||||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||||
shards: 2,
|
shards: 2,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
|
replica_to_primary_failover_enabled: false,
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
query_parser_enabled: true,
|
query_parser_enabled: true,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
@@ -1487,6 +1539,7 @@ mod test {
|
|||||||
shards: 5,
|
shards: 5,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
|
replica_to_primary_failover_enabled: false,
|
||||||
query_parser_enabled: true,
|
query_parser_enabled: true,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: true,
|
query_parser_read_write_splitting: true,
|
||||||
|
|||||||
34
start_test_env.sh
Executable file
34
start_test_env.sh
Executable file
@@ -0,0 +1,34 @@
|
|||||||
|
GREEN="\033[0;32m"
|
||||||
|
RED="\033[0;31m"
|
||||||
|
BLUE="\033[0;34m"
|
||||||
|
RESET="\033[0m"
|
||||||
|
|
||||||
|
|
||||||
|
cd tests/docker/
|
||||||
|
docker compose kill main || true
|
||||||
|
docker compose build main
|
||||||
|
docker compose down
|
||||||
|
docker compose up -d
|
||||||
|
# wait for the container to start
|
||||||
|
while ! docker compose exec main ls; do
|
||||||
|
echo "Waiting for test environment to start"
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
echo "==================================="
|
||||||
|
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
||||||
|
docker compose exec --workdir /app main cargo build
|
||||||
|
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
||||||
|
docker compose exec --workdir /app/tests/ruby main bundle install
|
||||||
|
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
||||||
|
echo "Interactive test environment ready"
|
||||||
|
echo "To run integration tests, you can use the following commands:"
|
||||||
|
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||||
|
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||||
|
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||||
|
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||||
|
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||||
|
echo "You can rebuild PgCat from within the container by running"
|
||||||
|
echo -e " ${GREEN}cargo build${RESET}"
|
||||||
|
echo "and then run the tests again"
|
||||||
|
echo "==================================="
|
||||||
|
docker compose exec --workdir /app/tests main bash
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
version: "3"
|
|
||||||
services:
|
services:
|
||||||
pg1:
|
pg1:
|
||||||
image: postgres:14
|
image: postgres:14
|
||||||
@@ -48,6 +47,8 @@ services:
|
|||||||
main:
|
main:
|
||||||
build: .
|
build: .
|
||||||
command: ["bash", "/app/tests/docker/run.sh"]
|
command: ["bash", "/app/tests/docker/run.sh"]
|
||||||
|
environment:
|
||||||
|
- INTERACTIVE_TEST_ENVIRONMENT=true
|
||||||
volumes:
|
volumes:
|
||||||
- ../../:/app/
|
- ../../:/app/
|
||||||
- /app/target/
|
- /app/target/
|
||||||
|
|||||||
@@ -5,6 +5,38 @@ rm /app/*.profraw || true
|
|||||||
rm /app/pgcat.profdata || true
|
rm /app/pgcat.profdata || true
|
||||||
rm -rf /app/cov || true
|
rm -rf /app/cov || true
|
||||||
|
|
||||||
|
# Prepares the interactive test environment
|
||||||
|
#
|
||||||
|
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
||||||
|
ports=(5432 7432 8432 9432 10432)
|
||||||
|
for port in "${ports[@]}"; do
|
||||||
|
is_it_up=0
|
||||||
|
attempts=0
|
||||||
|
while [ $is_it_up -eq 0 ]; do
|
||||||
|
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo "PostgreSQL on port $port is up."
|
||||||
|
is_it_up=1
|
||||||
|
else
|
||||||
|
attempts=$((attempts+1))
|
||||||
|
if [ $attempts -gt 10 ]; then
|
||||||
|
echo "PostgreSQL on port $port is down, giving up."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "PostgreSQL on port $port is down, waiting for it to start."
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
done
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
sleep 100000000000000000
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||||
export RUSTC_BOOTSTRAP=1
|
export RUSTC_BOOTSTRAP=1
|
||||||
export CARGO_INCREMENTAL=0
|
export CARGO_INCREMENTAL=0
|
||||||
|
|||||||
@@ -1,2 +1,3 @@
|
|||||||
|
pytest
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
psutil==5.9.1
|
||||||
|
|||||||
71
tests/python/test_auth.py
Normal file
71
tests/python/test_auth.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import utils
|
||||||
|
import signal
|
||||||
|
|
||||||
|
class TestTrustAuth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
config= """
|
||||||
|
[general]
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = 6432
|
||||||
|
admin_username = "admin_user"
|
||||||
|
admin_password = ""
|
||||||
|
admin_auth_type = "trust"
|
||||||
|
|
||||||
|
[pools.sharded_db.users.0]
|
||||||
|
username = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
auth_type = "trust"
|
||||||
|
pool_size = 10
|
||||||
|
min_pool_size = 1
|
||||||
|
pool_mode = "transaction"
|
||||||
|
|
||||||
|
[pools.sharded_db.shards.0]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
]
|
||||||
|
database = "shard0"
|
||||||
|
"""
|
||||||
|
utils.pgcat_generic_start(config)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_admin_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(admin=True)
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_normal_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
class TestMD5Auth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
utils.pgcat_start()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_normal_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_admin_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
@@ -1,84 +1,12 @@
|
|||||||
from typing import Tuple
|
|
||||||
import psycopg2
|
|
||||||
import psutil
|
|
||||||
import os
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
import utils
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
|
||||||
PGCAT_PORT = "6432"
|
|
||||||
|
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if "pgcat" == proc.name():
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep pgcat'):
|
|
||||||
raise Exception("pgcat not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
password = "admin_pass"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def test_normal_db_access():
|
|
||||||
pgcat_start()
|
|
||||||
conn, cur = connect_db(autocommit=False)
|
|
||||||
cur.execute("SELECT 1")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_admin_db_access():
|
|
||||||
conn, cur = connect_db(admin=True)
|
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
|
|
||||||
@@ -86,17 +14,17 @@ def test_shutdown_logic():
|
|||||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and send query (not in transaction)
|
# Create client connection and send query (not in transaction)
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cur.execute("COMMIT;")
|
cur.execute("COMMIT;")
|
||||||
|
|
||||||
# Send sigint to pgcat
|
# Send sigint to pgcat
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||||
@@ -108,18 +36,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -138,24 +66,24 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH SIGINT
|
# HANDLE TRANSACTION WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||||
@@ -165,18 +93,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -194,30 +122,30 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
time_taken = time.perf_counter() - start
|
time_taken = time.perf_counter() - start
|
||||||
if time_taken > 0.1:
|
if time_taken > 0.1:
|
||||||
@@ -227,49 +155,49 @@ def test_shutdown_logic():
|
|||||||
else:
|
else:
|
||||||
raise Exception("Able connect to database during shutdown")
|
raise Exception("Able connect to database during shutdown")
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db(admin=True)
|
conn, cur = utils.connect_db(admin=True)
|
||||||
cur.execute("SHOW DATABASES;")
|
cur.execute("SHOW DATABASES;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception(e)
|
raise Exception(e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
admin_cur.execute("SHOW DATABASES;")
|
admin_cur.execute("SHOW DATABASES;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -277,24 +205,24 @@ def test_shutdown_logic():
|
|||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception("Could not execute admin command:", e)
|
raise Exception("Could not execute admin command:", e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||||
@@ -308,12 +236,7 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint and expected timeout")
|
raise Exception("Server not closed after sigint and expected timeout")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
test_normal_db_access()
|
|
||||||
test_admin_db_access()
|
|
||||||
test_shutdown_logic()
|
|
||||||
110
tests/python/utils.py
Normal file
110
tests/python/utils.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
from typing import Tuple
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import psutil
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
|
||||||
|
def _pgcat_start(config_path: str):
|
||||||
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
os.system(f"./target/debug/pgcat {config_path} &")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
_pgcat_start(config_path='.circleci/pgcat.toml')
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_generic_start(config: str):
|
||||||
|
tmp = tempfile.NamedTemporaryFile()
|
||||||
|
with open(tmp.name, 'w') as f:
|
||||||
|
f.write(config)
|
||||||
|
_pgcat_start(config_path=tmp.name)
|
||||||
|
|
||||||
|
|
||||||
|
def glauth_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if proc.name() == "glauth":
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep glauth'):
|
||||||
|
raise Exception("glauth not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep pgcat'):
|
||||||
|
raise Exception("pgcat not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def connect_db(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
password = "admin_pass"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
def connect_db_trust(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
@@ -1,22 +1,33 @@
|
|||||||
GEM
|
GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
activemodel (7.0.4.1)
|
activemodel (7.1.4)
|
||||||
activesupport (= 7.0.4.1)
|
activesupport (= 7.1.4)
|
||||||
activerecord (7.0.4.1)
|
activerecord (7.1.4)
|
||||||
activemodel (= 7.0.4.1)
|
activemodel (= 7.1.4)
|
||||||
activesupport (= 7.0.4.1)
|
activesupport (= 7.1.4)
|
||||||
activesupport (7.0.4.1)
|
timeout (>= 0.4.0)
|
||||||
|
activesupport (7.1.4)
|
||||||
|
base64
|
||||||
|
bigdecimal
|
||||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||||
|
connection_pool (>= 2.2.5)
|
||||||
|
drb
|
||||||
i18n (>= 1.6, < 2)
|
i18n (>= 1.6, < 2)
|
||||||
minitest (>= 5.1)
|
minitest (>= 5.1)
|
||||||
|
mutex_m
|
||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
concurrent-ruby (1.1.10)
|
base64 (0.2.0)
|
||||||
|
bigdecimal (3.1.8)
|
||||||
|
concurrent-ruby (1.3.4)
|
||||||
|
connection_pool (2.4.1)
|
||||||
diff-lcs (1.5.0)
|
diff-lcs (1.5.0)
|
||||||
i18n (1.12.0)
|
drb (2.2.1)
|
||||||
|
i18n (1.14.5)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
minitest (5.17.0)
|
minitest (5.25.1)
|
||||||
|
mutex_m (0.2.0)
|
||||||
parallel (1.22.1)
|
parallel (1.22.1)
|
||||||
parser (3.1.2.0)
|
parser (3.1.2.0)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
@@ -24,7 +35,8 @@ GEM
|
|||||||
pg (1.3.2)
|
pg (1.3.2)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
regexp_parser (2.3.1)
|
regexp_parser (2.3.1)
|
||||||
rexml (3.2.5)
|
rexml (3.3.6)
|
||||||
|
strscan
|
||||||
rspec (3.11.0)
|
rspec (3.11.0)
|
||||||
rspec-core (~> 3.11.0)
|
rspec-core (~> 3.11.0)
|
||||||
rspec-expectations (~> 3.11.0)
|
rspec-expectations (~> 3.11.0)
|
||||||
@@ -50,10 +62,12 @@ GEM
|
|||||||
rubocop-ast (1.17.0)
|
rubocop-ast (1.17.0)
|
||||||
parser (>= 3.1.1.0)
|
parser (>= 3.1.1.0)
|
||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
|
strscan (3.1.0)
|
||||||
|
timeout (0.4.1)
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
toxiproxy (2.0.1)
|
||||||
tzinfo (2.0.5)
|
tzinfo (2.0.6)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.1.0)
|
unicode-display_width (2.1.0)
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require_relative 'spec_helper'
|
require_relative "spec_helper"
|
||||||
|
|
||||||
describe "Random Load Balancing" do
|
describe "Random Load Balancing" do
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||||
@@ -8,7 +8,7 @@ describe "Random Load Balancing" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
context "under regular circumstances" do
|
context("under regular circumstances") do
|
||||||
it "balances query volume between all instances" do
|
it "balances query volume between all instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|
||||||
@@ -22,14 +22,14 @@ describe "Random Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
expect(failed_count).to(eq(0))
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when some replicas are down" do
|
context("when some replicas are down") do
|
||||||
it "balances query volume between working instances" do
|
it "balances query volume between working instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||||
@@ -49,9 +49,9 @@ describe "Random Load Balancing" do
|
|||||||
processes.all_databases.each do |instance|
|
processes.all_databases.each do |instance|
|
||||||
queries_routed = instance.count_select_1_plus_2
|
queries_routed = instance.count_select_1_plus_2
|
||||||
if processes.replicas[0..1].include?(instance)
|
if processes.replicas[0..1].include?(instance)
|
||||||
expect(queries_routed).to eq(0)
|
expect(queries_routed).to(eq(0))
|
||||||
else
|
else
|
||||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -65,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
context "under homogeneous load" do
|
context("under homogeneous load") do
|
||||||
it "balances query volume between all instances" do
|
it "balances query volume between all instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|
||||||
@@ -79,15 +79,15 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
expect(failed_count).to(eq(0))
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "under heterogeneous load" do
|
context("under heterogeneous load") do
|
||||||
xit "balances query volume between all instances based on how busy they are" do
|
xit("balances query volume between all instances based on how busy they are") do
|
||||||
slow_query_count = 2
|
slow_query_count = 2
|
||||||
threads = Array.new(slow_query_count) do
|
threads = Array.new(slow_query_count) do
|
||||||
Thread.new do
|
Thread.new do
|
||||||
@@ -108,31 +108,32 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
expect(failed_count).to(eq(0))
|
||||||
# Under LOQ, we expect replicas running the slow pg_sleep
|
# Under LOQ, we expect replicas running the slow pg_sleep
|
||||||
# to get no selects
|
# to get no selects
|
||||||
expect(
|
expect(
|
||||||
processes.
|
processes
|
||||||
all_databases.
|
.all_databases
|
||||||
map(&:count_select_1_plus_2).
|
.map(&:count_select_1_plus_2)
|
||||||
count { |instance_share| instance_share == 0 }
|
.count { |instance_share| instance_share == 0 }
|
||||||
).to eq(slow_query_count)
|
)
|
||||||
|
.to(eq(slow_query_count))
|
||||||
|
|
||||||
# We also expect the quick queries to be spread across
|
# We also expect the quick queries to be spread across
|
||||||
# the idle servers only
|
# the idle servers only
|
||||||
processes.
|
processes
|
||||||
all_databases.
|
.all_databases
|
||||||
map(&:count_select_1_plus_2).
|
.map(&:count_select_1_plus_2)
|
||||||
reject { |instance_share| instance_share == 0 }.
|
.reject { |instance_share| instance_share == 0 }
|
||||||
each do |instance_share|
|
.each do |instance_share|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||||
end
|
end
|
||||||
|
|
||||||
threads.map(&:join)
|
threads.map(&:join)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when some replicas are down" do
|
context("when some replicas are down") do
|
||||||
it "balances query volume between working instances" do
|
it "balances query volume between working instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||||
@@ -149,16 +150,106 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to be <= 2
|
expect(failed_count).to(be <= 2)
|
||||||
processes.all_databases.each do |instance|
|
processes.all_databases.each do |instance|
|
||||||
queries_routed = instance.count_select_1_plus_2
|
queries_routed = instance.count_select_1_plus_2
|
||||||
if processes.replicas[0..1].include?(instance)
|
if processes.replicas[0..1].include?(instance)
|
||||||
expect(queries_routed).to eq(0)
|
expect(queries_routed).to(eq(0))
|
||||||
else
|
else
|
||||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "Candidate filtering based on `default_pool`" do
|
||||||
|
let(:processes) {
|
||||||
|
Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings)
|
||||||
|
}
|
||||||
|
|
||||||
|
after do
|
||||||
|
processes.all_databases.map(&:reset)
|
||||||
|
processes.pgcat.shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
context("with default_pool set to replicas") do
|
||||||
|
context("when all replicas are down ") do
|
||||||
|
let(:pool_settings) do
|
||||||
|
{
|
||||||
|
"default_role" => "replica",
|
||||||
|
"replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
context("with `replica_to_primary_failover_enabled` set to false`") do
|
||||||
|
let(:replica_to_primary_failover_enabled) { false }
|
||||||
|
|
||||||
|
it(
|
||||||
|
"unbans them automatically to prevent false positives in health checks that could make all replicas unavailable"
|
||||||
|
) do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count = 0
|
||||||
|
number_of_replicas = processes[:replicas].length
|
||||||
|
|
||||||
|
# Take down all replicas
|
||||||
|
processes[:replicas].each(&:take_down)
|
||||||
|
|
||||||
|
(number_of_replicas + 1).times do |n|
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to(eq(number_of_replicas + 1))
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
# Ban_time is configured to 60 so this reset will only work
|
||||||
|
# if the replicas are unbanned automatically
|
||||||
|
processes[:replicas].each(&:reset)
|
||||||
|
|
||||||
|
number_of_replicas.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to(eq(0))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context("with `replica_to_primary_failover_enabled` set to true`") do
|
||||||
|
let(:replica_to_primary_failover_enabled) { true }
|
||||||
|
|
||||||
|
it "does not unbans them automatically" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count = 0
|
||||||
|
number_of_replicas = processes[:replicas].length
|
||||||
|
|
||||||
|
# We need to allow pgcat to open connections to replicas
|
||||||
|
(number_of_replicas + 10).times do |n|
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
expect(failed_count).to(eq(0))
|
||||||
|
|
||||||
|
# Take down all replicas
|
||||||
|
processes[:replicas].each(&:take_down)
|
||||||
|
|
||||||
|
(number_of_replicas + 10).times do |n|
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to(eq(number_of_replicas))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|||||||
682
tests/rust/Cargo.lock
generated
682
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,13 +15,11 @@ async fn test_prepared_statements() {
|
|||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = tokio::task::spawn(async move {
|
||||||
for _ in 0..1000 {
|
for i in 0..1000 {
|
||||||
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.to_string().contains("prepared statement") {
|
panic!("prepared statement error: {}", err);
|
||||||
panic!("prepared statement error: {}", err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user