Compare commits

..

4 Commits

Author SHA1 Message Date
Lev Kokotov
0d5feac4b2 Contributors (#384) 2023-03-24 17:12:12 -07:00
Lev Kokotov
90aba9c011 V1 (#383) 2023-03-24 17:10:12 -07:00
Montana Low
0f34b49503 point CI at updated repo 2023-03-24 12:59:03 -07:00
Zain Kabani
ca4431b67e Add idle client in transaction configuration (#380)
* Add idle client in transaction configuration

* fmt

* Update docs

* trigger build

* Add tests

* Make the config dynamic from reloads

* fmt

* comments

* trigger build

* fix config.md

* remove error
2023-03-24 08:20:30 -07:00
12 changed files with 538 additions and 756 deletions

View File

@@ -15,6 +15,6 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build CI Docker image
run: |
docker build . -f Dockerfile.ci --tag ghcr.io/levkk/pgcat-ci:latest
docker run ghcr.io/levkk/pgcat-ci:latest
docker push ghcr.io/levkk/pgcat-ci:latest
docker build . -f Dockerfile.ci --tag ghcr.io/postgresml/pgcat-ci:latest
docker run ghcr.io/postgresml/pgcat-ci:latest
docker push ghcr.io/postgresml/pgcat-ci:latest

View File

@@ -49,6 +49,14 @@ default: 30000 # milliseconds
How long an idle connection with a server is left open (ms).
### idle_client_in_transaction_timeout
```
path: general.idle_client_in_transaction_timeout
default: 0 # milliseconds
```
How long a client is allowed to be idle while in a transaction (ms).
### healthcheck_timeout
```
path: general.healthcheck_timeout

2
Cargo.lock generated
View File

@@ -716,7 +716,7 @@ dependencies = [
[[package]]
name = "pgcat"
version = "0.6.0-alpha1"
version = "1.0.0"
dependencies = [
"arc-swap",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "0.6.0-alpha1"
version = "1.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -1,4 +1,4 @@
Copyright (c) 2022 Lev Kokotov <lev@levthe.dev>
Copyright (c) 2023 PgCat Contributors
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the

398
README.md
View File

@@ -1,33 +1,47 @@
##### PgCat: PostgreSQL at petabyte scale
## PgCat: Nextgen PostgreSQL Pooler
[![CircleCI](https://circleci.com/gh/levkk/pgcat/tree/main.svg?style=svg)](https://circleci.com/gh/levkk/pgcat/tree/main)
[![CircleCI](https://circleci.com/gh/postgresml/pgcat/tree/main.svg?style=svg)](https://circleci.com/gh/postgresml/pgcat/tree/main)
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
</a>
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
**Beta**: looking for beta testers, see [#35](https://github.com/levkk/pgcat/issues/35).
PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
## Features
| **Feature** | **Status** | **Comments** |
|--------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :white_check_mark: | Identical to PgBouncer. |
| Session pooling | :white_check_mark: | Identical to PgBouncer. |
| `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
| Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. |
| Load balancing of read queries | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. |
| Live configuration reloading | :white_check_mark: | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. |
| Client authentication | :white_check_mark: :wrench: | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. |
| Admin database | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. |
| **Feature** | **Status** | **Comments** |
|-------------|------------|--------------|
| Transaction pooling | **Stable** | Identical to PgBouncer with notable improvements for handling bad clients and abandoned transactions. |
| Session pooling | **Stable** | Identical to PgBouncer. |
| Multi-threaded runtime | **Stable** | Using Tokio asynchronous runtime, the pooler takes advantage of multicore machines. |
| Load balancing of read queries | **Stable** | Queries are automatically load balanced between replicas and the primary. |
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
| Sharding using extended SQL syntax | **Experimental** | Clients can dynamically configure the pooler to route queries to specific shards. |
| Sharding using comments parsing/Regex | **Experimental** | Clients can include shard information (sharding key, shard ID) in the query comments. |
| Automatic sharding | **Experimental** | PgCat can parse queries detect sharding keys automatically, and route queries to the route shard. |
| Mirroring | **Experimental** | Mirror queries between multiple databases in order to test servers with realistic production traffic. |
## Status
PgCat is stable and used in production to serve hundreds of thousands of queries per second. Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
| | |
|-|-|
|<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f"><img src="./images/instacart.webp" height="70" width="auto"></a>|<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second"><img src="./images/postgresml.webp" height="70" width="auto"></a>|
| [Instacart](https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f) | [PostgresML](https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second) |
## Deployment
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. That setting can be adjusted to spawn as many (or as little) workers as needed.
A Docker image is available from `docker pull ghcr.io/postgresml/pgcat:latest`. See our [Github packages repository](https://github.com/postgresml/pgcat/pkgs/container/pgcat).
For quick local example, use the Docker Compose environment provided:
```bash
@@ -39,9 +53,13 @@ PGPASSWORD=postgres psql -h 127.0.0.1 -p 6432 -U postgres -c 'SELECT 1'
### Config
See [Configurations page](https://github.com/levkk/pgcat/blob/main/CONFIG.md)
See **[Configuration](https://github.com/levkk/pgcat/blob/main/CONFIG.md)**.
## Local development
## Contributing
The project is being actively developed and looking for additional contributors and production deployments.
### Local development
1. Install Rust (latest stable will work great).
2. `cargo build --release` (to get better benchmarks).
@@ -51,7 +69,7 @@ See [Configurations page](https://github.com/levkk/pgcat/blob/main/CONFIG.md)
### Tests
Quickest way to test your changes is to use pgbench:
When making substantial modifications to the protocol implementation, make sure to test them with pgbench:
```
pgbench -i -h 127.0.0.1 -p 6432 && \
@@ -61,36 +79,26 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
Run `cargo test` to run Rust tests.
Additionally, all features are tested with Ruby, Python, and Rust tests unit and integration tests.
Run `cargo test` to run Rust unit tests.
Run the following commands to run Ruby and Python integration tests:
Run the following commands to run Integration tests locally.
```
cd tests/docker/
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
```
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
| Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. |
| `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
| Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics | :white_check_mark: | :white_check_mark: | Query the admin database with `psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS'`. |
| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
### Docker-based local development
### Dev
Also, you can open a 'dev' environment where you can debug tests easier by running the following command:
You can open a Docker development environment where you can debug tests easier. Run the following command to spin it up:
```
./dev/script/console
```
This will open a terminal in an environment similar to that used in tests. In there you can compile, run tests, do some debugging with the test environment, etc. Objects
compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have in your host.
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
## Usage
@@ -105,11 +113,9 @@ In transaction mode, a client talks to one server for the duration of a single t
This mode is enabled by default.
### Load balancing of read queries
All queries are load balanced against the configured servers using the random algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
All queries are load balanced against the configured servers using either the random or least open connections algorithms. The most straightforward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
The query parser is disabled by default.
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser, implemented with the `sqlparser` crate, will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
#### Query parser
The query parser will do its best to determine where the query should go, but sometimes that's not possible. In that case, the client can select which server it wants using this custom SQL syntax:
@@ -136,38 +142,14 @@ The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
### Failover
All servers are checked with a `SELECT 1` query before being given to a client. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
The ban time can be changed with `ban_time`. The default is 60 seconds.
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the random loop. |
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the random loop. |
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the random loop. |
| Read query | replica | false | false | up | Query is routed to the first replica instance in the random loop. |
| Read query | primary | false | false | up | Query is routed to the primary. |
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the random loop is attempted. |
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the random loop. |
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
| | | | | | |
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the random loop. If the instance is a replica, the query fails and the client receives an error. |
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
| Write query | primary | false | false | up | The query is routed to the primary. |
| Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
| Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
| Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
| Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
| | | | | | |
### Sharding
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
#### Extended syntax
To route queries to a particular shard, we use this custom SQL syntax:
```sql
@@ -182,7 +164,8 @@ The active shard will last until it's changed again or the client disconnects. B
For hash function implementation, see `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql`.
#### ActiveRecord/Rails
##### ActiveRecord/Rails
```ruby
class User < ActiveRecord::Base
@@ -210,7 +193,7 @@ User.connection.execute "SET SERVER ROLE TO 'auto'"
User.find_by_email("test@example.com")
```
#### Raw SQL
##### Raw SQL
```sql
-- Grab a bunch of users from shard 1
@@ -230,268 +213,45 @@ SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query sh
SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts until set again; we are reading from the primary
```
#### With comments
Issuing queries to the pooler can cause additional latency. To reduce its impact, it's possible to include sharding information inside SQL comments sent via the query. This is reasonably easy to implement with ORMs like [ActiveRecord](https://api.rubyonrails.org/classes/ActiveRecord/QueryMethods.html#method-i-annotate) and [SQLAlchemy](https://docs.sqlalchemy.org/en/20/core/events.html#sql-execution-and-connection-events).
```
/* shard_id: 5 */ SELECT * FROM foo WHERE id = 1234;
/* sharding_key: 1234 */ SELECT * FROM foo WHERE id = 1234;
```
#### Automatic query parsing
PgCat can use the `sqlparser` crate to parse SQL queries and extract the sharding key. This is configurable with the `automatic_sharding_key` setting. This feature is still experimental, but it's the ideal implementation for sharding, requiring no client modifications.
### Statistics reporting
The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
The stats are very similar to what PgBouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
```
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
```
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. Not all settings are currently supported by live reload:
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
| **Config** | **Requires restart** |
|-------------------------|----------------------|
| `host` | yes |
| `port` | yes |
| `pool_mode` | no |
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `shutdown_timeout` | no |
| `healthcheck_delay` | no |
| `ban_time` | no |
| `user` | yes |
| `shards` | yes |
| `default_role` | no |
| `primary_reads_enabled` | no |
| `query_parser_enabled` | no |
### Mirroring
Mirroring allows to route queries to multiple databases at the same time. This is useful for prewarning replicas before placing them into the active configuration, or for testing different versions of Postgres with live traffic.
## Benchmarks
## License
You can setup PgBench locally through PgCat:
PgCat is free and open source, released under the MIT license.
```
pgbench -h 127.0.0.1 -p 6432 -i
```
## Contributors
Coincidenly, this uses `COPY` so you can test if that works. Additionally, we'll be running the following PgBench configurations:
Many thanks to our amazing contributors!
1. 16 clients, 2 threads
2. 32 clients, 2 threads
3. 64 clients, 2 threads
4. 128 clients, 2 threads
<a href = "https://github.com/postgresml/pgcat/graphs/contributors">
<img src = "https://contrib.rocks/image?repo=postgresml/pgcat"/>
</a>
All queries will be `SELECT` only (`-S`) just so disks don't get in the way, since the dataset will be effectively all in RAM.
My setup:
- 8 cores, 16 hyperthreaded (AMD Ryzen 5800X)
- 32GB RAM (doesn't matter for this benchmark, except to prove that Postgres will fit the whole dataset into RAM)
### PgBouncer
#### Config
```ini
[databases]
shard0 = host=localhost port=5432 user=sharding_user password=sharding_user
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
```
Everything else stays default.
#### Runs
```
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.155 ms
tps = 103417.377469 (including connections establishing)
tps = 103510.639935 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.290 ms
tps = 110325.939785 (including connections establishing)
tps = 110386.513435 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.692 ms
tps = 92470.427412 (including connections establishing)
tps = 92618.389350 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 1.406 ms
tps = 91013.429985 (including connections establishing)
tps = 91067.583928 (excluding connections establishing)
```
### PgCat
#### Config
The only thing that matters here is the number of workers in the Tokio pool. Make sure to set it to < than the number of your CPU cores.
Also account for hyper-threading, so if you have that, take the number you got above and divide it by two, that way only "real" cores serving
requests.
My setup is 16 threads, 8 cores (`htop` shows as 16 CPUs), so I set the `max_workers` in Tokio to 4. Too many, and it starts conflicting with PgBench
which is also running on the same system.
#### Runs
```
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.164 ms
tps = 97705.088232 (including connections establishing)
tps = 97872.216045 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.288 ms
tps = 111300.488119 (including connections establishing)
tps = 111413.107800 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.556 ms
tps = 115190.496139 (including connections establishing)
tps = 115247.521295 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 1.135 ms
tps = 112770.562239 (including connections establishing)
tps = 112796.502381 (excluding connections establishing)
```
### Direct Postgres
Always good to have a base line.
#### Runs
```
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 16
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 16000/16000
latency average = 0.115 ms
tps = 139443.955722 (including connections establishing)
tps = 142314.859075 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 32
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 32000/32000
latency average = 0.212 ms
tps = 150644.840891 (including connections establishing)
tps = 152218.499430 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 64
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 64000/64000
latency average = 0.420 ms
tps = 152517.663404 (including connections establishing)
tps = 153319.188482 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
query mode: extended
number of clients: 128
number of threads: 2
number of transactions per client: 1000
number of transactions actually processed: 128000/128000
latency average = 0.854 ms
tps = 149818.594087 (including connections establishing)
tps = 150200.603049 (excluding connections establishing)
```

BIN
images/instacart.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

BIN
images/postgresml.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

View File

@@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds
# How long an idle connection with a server is left open (ms).
idle_timeout = 30000 # milliseconds
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout = 0 # milliseconds
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000 # milliseconds

View File

@@ -1,15 +1,10 @@
use crate::errors::Error;
use crate::pool::BanReason;
use bb8::PooledConnection;
/// Handle clients by pretending to be a PostgreSQL server.
use bytes::{Buf, BufMut, BytesMut};
use hyper::server::conn;
use log::{debug, error, info, trace, warn};
use crate::pool::ServerPool;
use crate::config::Role;
use std::collections::HashMap;
use std::ops::Add;
use std::time::Instant;
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
@@ -17,7 +12,7 @@ use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::config::{get_config, Address, PoolMode};
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*;
use crate::messages::*;
@@ -36,19 +31,6 @@ enum ClientConnectionType {
CancelQuery,
}
struct RetryBuffer {
buffer: BytesMut,
retry_count: u32,
}
pub enum ClientFlowControl {
Retry,
PerformNextCommand,
ReleaseConnection,
Disconnect
}
/// The client state. One of these is created per client.
pub struct Client<S, T> {
/// The reads are buffered (8K by default).
@@ -110,8 +92,6 @@ pub struct Client<S, T> {
/// Used to notify clients about an impending shutdown
shutdown: Receiver<()>,
retry_buffer: Option<RetryBuffer>,
}
/// Client entrypoint.
@@ -578,7 +558,6 @@ where
application_name: application_name.to_string(),
shutdown,
connected_to_server: false,
retry_buffer: None,
})
}
@@ -613,184 +592,155 @@ where
application_name: String::from("undefined"),
shutdown,
connected_to_server: false,
retry_buffer: None,
})
}
/// Handle a connected and authenticated client.
pub async fn handle(&mut self) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
trace!("Sending CancelRequest");
async fn cancel_query(&mut self) -> Result<(), Error> {
trace!("Sending CancelRequest");
let (process_id, secret_key, address, port) = {
let guard = self.client_server_map.lock();
let (process_id, secret_key, address, port) = {
let guard = self.client_server_map.lock();
match guard.get(&(self.process_id, self.secret_key)) {
// Drop the mutex as soon as possible.
// We found the server the client is using for its query
// that it wants to cancel.
Some((process_id, secret_key, address, port)) => {
(*process_id, *secret_key, address.clone(), *port)
}
match guard.get(&(self.process_id, self.secret_key)) {
// Drop the mutex as soon as possible.
// We found the server the client is using for its query
// that it wants to cancel.
Some((process_id, secret_key, address, port)) => {
(*process_id, *secret_key, address.clone(), *port)
// The client doesn't know / got the wrong server,
// we're closing the connection for security reasons.
None => return Ok(()),
}
};
// The client doesn't know / got the wrong server,
// we're closing the connection for security reasons.
None => return Ok(()),
}
};
// Opens a new separate connection to the server, sends the backend_id
// and secret_key and then closes it for security reasons. No other interactions
// take place.
return Server::cancel(&address, port, process_id, secret_key).await;
}
// Opens a new separate connection to the server, sends the backend_id
// and secret_key and then closes it for security reasons. No other interactions
// take place.
return Server::cancel(&address, port, process_id, secret_key).await;
}
async fn checkout_connection(&mut self, pool: &ConnectionPool, query_router: &mut QueryRouter) -> Result<(PooledConnection<ServerPool>, Address), Error> {
// Grab a server from the pool.
let mut connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id)
.await
{
Ok(conn) => {
debug!("Got connection from pool");
conn
}
Err(err) => {
self.buffer.clear();
error_response(&mut self.write, "could not get connection from the pool")
.await?;
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
return Err(err);
}
};
let server = &mut connection.0;
let address = connection.1.clone();
// Server is assigned to the client in case the client wants to
// cancel a query later.
server.claim(self.process_id, self.secret_key);
self.connected_to_server = true;
// Update statistics
self.stats
.client_active(self.process_id, server.server_id());
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.server_id());
debug!(
"Client {:?} talking to server {:?}",
self.addr,
server.address()
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
self.stats.client_register(
self.process_id,
self.pool_name.clone(),
self.username.clone(),
self.application_name.clone(),
);
return Ok(connection);
}
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
loop {
trace!(
"Client idle, waiting for message, transaction mode: {}",
self.transaction_mode
);
async fn client_proc(&mut self, query_router: &mut QueryRouter) -> Result<ClientFlowControl, Error> {
// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool,
// in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint';
trace!(
"Client idle, waiting for message, transaction mode: {}",
self.transaction_mode
);
let message = tokio::select! {
_ = self.shutdown.recv() => {
if !self.admin {
error_response_terminal(
&mut self.write,
"terminating connection due to administrator command"
).await?;
return Ok(())
}
// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool,
// in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint';
let message = tokio::select! {
_ = self.shutdown.recv() => {
if !self.admin {
error_response_terminal(
&mut self.write,
"terminating connection due to administrator command"
).await?;
return Ok(ClientFlowControl::Disconnect)
} else {
// Admin clients ignore shutdown.
read_message(&mut self.read).await?
}
},
else {
read_message(&mut self.read).await?
}
},
message_result = read_message(&mut self.read) => message_result?
};
message_result = read_message(&mut self.read) => message_result?
};
match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
// we'll be able to allocate a connection. Also, clients do not expect
// the server to respond to these messages so even if we were not able to
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'D' | 'E' => {
self.buffer.put(&message[..]);
return Ok(ClientFlowControl::PerformNextCommand);
}
'Q' => {
if query_router.query_parser_enabled() {
query_router.infer(&message);
}
}
'P' => {
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
query_router.infer(&message);
match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
// we'll be able to allocate a connection. Also, clients do not expect
// the server to respond to these messages so even if we were not able to
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'D' | 'E' => {
self.buffer.put(&message[..]);
continue;
}
return Ok(ClientFlowControl::PerformNextCommand);
}
'B' => {
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
query_router.infer_shard_from_bind(&message);
'Q' => {
if query_router.query_parser_enabled() {
query_router.infer(&message);
}
}
return Ok(ClientFlowControl::PerformNextCommand);
'P' => {
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
query_router.infer(&message);
}
continue;
}
'B' => {
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
query_router.infer_shard_from_bind(&message);
}
continue;
}
'X' => {
debug!("Client disconnecting");
return Ok(());
}
_ => (),
}
'X' => {
debug!("Client disconnecting");
return Ok(ClientFlowControl::Disconnect);
// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}
_ => (),
}
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
return Ok(ClientFlowControl::PerformNextCommand);
}
// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
}
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());
// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
}
let current_shard = query_router.shard();
query_router.update_pool_settings(pool.settings.clone());
// Handle all custom protocol commands, if any.
match query_router.try_execute_command(&message) {
// Normal query, not a custom command.
None => (),
let current_shard = query_router.shard();
// Handle all custom protocol commands, if any.
match query_router.try_execute_command(&message) {
// Normal query, not a custom command.
None => (),
// SET SHARD TO
Some((Command::SetShard, _)) => {
// Selected shard is not configured.
@@ -811,48 +761,97 @@ where
} else {
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
}
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SET PRIMARY READS TO
Some((Command::SetPrimaryReads, _)) => {
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SET SHARDING KEY TO
Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SET SERVER ROLE TO
Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SHOW SERVER ROLE
Some((Command::ShowServerRole, value)) => {
show_response(&mut self.write, "server role", &value).await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SHOW SHARD
Some((Command::ShowShard, value)) => {
show_response(&mut self.write, "shard", &value).await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
// SHOW PRIMARY READS
Some((Command::ShowPrimaryReads, value)) => {
show_response(&mut self.write, "primary reads", &value).await?;
return Ok(ClientFlowControl::PerformNextCommand);
continue;
}
};
debug!("Waiting for connection from pool");
// Grab a server from the pool.
let connection = match pool
.get(query_router.shard(), query_router.role(), self.process_id)
.await
{
Ok(conn) => {
debug!("Got connection from pool");
conn
}
Err(err) => {
// Client is attempting to get results from the server,
// but we were unable to grab a connection from the pool
// We'll send back an error message and clean the extended
// protocol buffer
if message[0] as char == 'S' {
error!("Got Sync message but failed to get a connection from the pool");
self.buffer.clear();
}
error_response(&mut self.write, "could not get connection from the pool")
.await?;
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
continue;
}
};
let mut reference = connection.0;
let address = connection.1;
let server = &mut *reference;
// Server is assigned to the client in case the client wants to
// cancel a query later.
server.claim(self.process_id, self.secret_key);
self.connected_to_server = true;
// Update statistics
self.stats
.client_active(self.process_id, server.server_id());
self.last_address_id = Some(address.id);
self.last_server_id = Some(server.server_id());
debug!(
"Client {:?} talking to server {:?}",
self.addr,
server.address()
);
// TODO: investigate other parameters and set them too.
// Set application_name.
@@ -860,6 +859,11 @@ where
let mut initial_message = Some(message);
let idle_client_timeout_duration = match get_idle_client_in_transaction_timeout() {
0 => tokio::time::Duration::MAX,
timeout => tokio::time::Duration::from_millis(timeout),
};
// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode.
@@ -867,19 +871,189 @@ where
// If the client is in session mode, no more custom protocol
// commands will be accepted.
loop {
match self.tx_proc(server, &pool, initial_message).await {
Ok(control_flow) => {
match control_flow {
ClientFlowControl::PerformNextCommand => {
initial_message = None;
continue;
let message = match initial_message {
None => {
trace!("Waiting for message inside transaction or in session mode");
match tokio::time::timeout(
idle_client_timeout_duration,
read_message(&mut self.read),
)
.await
{
Ok(Ok(message)) => message,
Ok(Err(err)) => {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
server.checkin_cleanup().await?;
return Err(err);
}
control_flow_result => {
return Ok(control_flow_result);
Err(_) => {
// Client idle in transaction timeout
error_response(&mut self.write, "idle transaction timeout").await?;
error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role());
break;
}
}
},
Err(err) => return Err(err),
}
Some(message) => {
initial_message = None;
message
}
};
// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
// Safe to unwrap because we know this message has a certain length and has the code
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = *message.get(0).unwrap() as char;
trace!("Message: {}", code);
match code {
// Query
'Q' => {
debug!("Sending query to server");
self.send_and_receive_loop(code, Some(&message), server, &address, &pool)
.await?;
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
}
}
}
// Terminate
'X' => {
server.checkin_cleanup().await?;
self.release();
return Ok(());
}
// Parse
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
'P' => {
self.buffer.put(&message[..]);
}
// Bind
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
'B' => {
self.buffer.put(&message[..]);
}
// Describe
// Command a client can issue to describe a previously prepared named statement.
'D' => {
self.buffer.put(&message[..]);
}
// Execute
// Execute a prepared statement prepared in `P` and bound in `B`.
'E' => {
self.buffer.put(&message[..]);
}
// Sync
// Frontend (client) is asking for the query result now.
'S' => {
debug!("Sending query to server");
self.buffer.put(&message[..]);
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
// Almost certainly true
if first_message_code == 'P' {
// Message layout
// P followed by 32 int followed by null-terminated statement name
// So message code should be in offset 0 of the buffer, first character
// in prepared statement name would be index 5
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
if first_char_in_name != 0 {
// This is a named prepared statement
// Server connection state will need to be cleared at checkin
server.mark_dirty();
}
}
self.send_and_receive_loop(code, None, server, &address, &pool)
.await?;
self.buffer.clear();
if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
}
}
}
// CopyData
'd' => {
self.buffer.put(&message[..]);
// Want to limit buffer size
if self.buffer.len() > 8196 {
// Forward the data to the server,
self.send_server_message(server, &self.buffer, &address, &pool)
.await?;
self.buffer.clear();
}
}
// CopyDone or CopyFail
// Copy is done, successfully or not.
'c' | 'f' => {
// We may already have some copy data in the buffer, add this message to buffer
self.buffer.put(&message[..]);
self.send_server_message(server, &self.buffer, &address, &pool)
.await?;
// Clear the buffer
self.buffer.clear();
let response = self.receive_server_message(server, &address, &pool).await?;
match write_all_half(&mut self.write, &response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
}
}
}
// Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to.
_ => {
error!("Unexpected code: {}", code);
}
}
}
@@ -892,249 +1066,6 @@ where
self.release();
self.stats.client_idle(self.process_id);
}
#[inline(always)]
pub async fn tx_proc(&mut self, server: &mut Server, pool: &ConnectionPool, initial_message: Option<BytesMut> ) -> Result<ClientFlowControl, Error> {
let message = match initial_message {
None => {
trace!("Waiting for message inside transaction or in session mode");
match read_message(&mut self.read).await {
Ok(message) => message,
Err(err) => {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
server.checkin_cleanup().await?;
return Err(err);
}
}
}
Some(message) => message
};
// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
// Safe to unwrap because we know this message has a certain length and has the code
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = *message.get(0).unwrap() as char;
let address = server.address();
trace!("Message: {}", code);
match code {
// Query
'Q' => {
debug!("Sending query to server");
match self.send_and_receive_loop(code, Some(&message), server, &address, &pool).await {
Ok(_) => self.retry_buffer = None,
Err(_) => {
if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica {
match self.retry_buffer {
Some(ref mut retry_buffer) => {
if retry_buffer.retry_count < 3 {
retry_buffer.retry_count += 1;
return Ok(ClientFlowControl::Retry);
}
},
None => {
self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 });
return Ok(ClientFlowControl::Retry);
}
}
}
},
}
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
return Ok(ClientFlowControl::ReleaseConnection);
}
}
}
// Terminate
'X' => {
server.checkin_cleanup().await?;
self.release();
return Ok(ClientFlowControl::Disconnect);
}
// Parse
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
'P' => {
self.buffer.put(&message[..]);
}
// Bind
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
'B' => {
self.buffer.put(&message[..]);
}
// Describe
// Command a client can issue to describe a previously prepared named statement.
'D' => {
self.buffer.put(&message[..]);
}
// Execute
// Execute a prepared statement prepared in `P` and bound in `B`.
'E' => {
self.buffer.put(&message[..]);
}
// Sync
// Frontend (client) is asking for the query result now.
'S' => {
debug!("Sending query to server");
self.buffer.put(&message[..]);
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
// Almost certainly true
if first_message_code == 'P' {
// Message layout
// P followed by 32 int followed by null-terminated statement name
// So message code should be in offset 0 of the buffer, first character
// in prepared statement name would be index 5
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
if first_char_in_name != 0 {
// This is a named prepared statement
// Server connection state will need to be cleared at checkin
server.mark_dirty();
}
}
match self.send_and_receive_loop(code, None, server, &address, &pool).await {
Ok(_) => self.retry_buffer = None,
Err(err) => {
if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica {
match self.retry_buffer {
Some(ref mut retry_buffer) => {
if retry_buffer.retry_count < 3 {
retry_buffer.retry_count += 1;
return Ok(ClientFlowControl::Retry);
}
self.retry_buffer = None;
return Err(err);
},
None => {
let buffer = self.buffer.clone();
self.buffer.clear();
self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 });
return Ok(ClientFlowControl::Retry);
}
}
}
}
}
self.buffer.clear();
if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
return Ok(ClientFlowControl::ReleaseConnection);
}
}
}
// CopyData
'd' => {
self.buffer.put(&message[..]);
// Want to limit buffer size
if self.buffer.len() > 8196 {
// Forward the data to the server,
self.send_server_message(server, &self.buffer, &address, &pool)
.await?;
self.buffer.clear();
}
}
// CopyDone or CopyFail
// Copy is done, successfully or not.
'c' | 'f' => {
// We may already have some copy data in the buffer, add this message to buffer
self.buffer.put(&message[..]);
self.send_server_message(server, &self.buffer, &address, &pool)
.await?;
// Clear the buffer
self.buffer.clear();
let response = self.receive_server_message(server, &address, &pool).await?;
match write_all_half(&mut self.write, &response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.in_transaction() {
self.stats.transaction(self.process_id, server.server_id());
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
return Ok(ClientFlowControl::ReleaseConnection);
}
}
}
// Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to.
_ => {
return Err(Error::ProtocolSyncError("bad message code".to_string()));
}
}
return Ok(ClientFlowControl::PerformNextCommand);
}
/// Handle a connected and authenticated client.
pub async fn handle(&mut self) -> Result<(), Error> {
// The client wants to cancel a query it has issued previously.
if self.cancel_mode {
return self.cancel_query().await;
}
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
self.stats.client_register(
self.process_id,
self.pool_name.clone(),
self.username.clone(),
self.application_name.clone(),
);
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
loop {
self.client_proc(&mut query_router).await?;
}
}
/// Retrieve connection pool, if it exists.

View File

@@ -197,6 +197,9 @@ pub struct General {
#[serde(default = "General::default_ban_time")]
pub ban_time: i64,
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
pub idle_client_in_transaction_timeout: u64,
#[serde(default = "General::default_worker_threads")]
pub worker_threads: usize,
@@ -260,6 +263,10 @@ impl General {
pub fn default_worker_threads() -> usize {
4
}
pub fn default_idle_client_in_transaction_timeout() -> u64 {
0
}
}
impl Default for General {
@@ -276,6 +283,7 @@ impl Default for General {
healthcheck_delay: Self::default_healthcheck_delay(),
ban_time: Self::default_ban_time(),
worker_threads: Self::default_worker_threads(),
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
@@ -655,6 +663,13 @@ impl From<&Config> for std::collections::HashMap<String, String> {
config.general.healthcheck_delay.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
(
"idle_client_in_transaction_timeout".to_string(),
config
.general
.idle_client_in_transaction_timeout
.to_string(),
),
];
r.append(&mut static_settings);
@@ -666,6 +681,10 @@ impl Config {
/// Print current configuration.
pub fn show(&self) {
info!("Ban time: {}s", self.general.ban_time);
info!(
"Idle client in transaction timeout: {}ms",
self.general.idle_client_in_transaction_timeout
);
info!("Worker threads: {}", self.general.worker_threads);
info!(
"Healthcheck timeout: {}ms",
@@ -819,6 +838,12 @@ pub fn get_config() -> Config {
(*(*CONFIG.load())).clone()
}
pub fn get_idle_client_in_transaction_timeout() -> u64 {
(*(*CONFIG.load()))
.general
.idle_client_in_transaction_timeout
}
/// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();
@@ -889,6 +914,7 @@ mod test {
assert_eq!(get_config().path, "pgcat.toml".to_string());
assert_eq!(get_config().general.ban_time, 60);
assert_eq!(get_config().general.idle_client_in_transaction_timeout, 0);
assert_eq!(get_config().general.idle_timeout, 30000);
assert_eq!(get_config().pools.len(), 2);
assert_eq!(get_config().pools["sharded_db"].shards.len(), 3);

View File

@@ -309,4 +309,58 @@ describe "Miscellaneous" do
end
end
end
describe "Idle client timeout" do
context "idle transaction timeout set to 0" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
puts(current_configs["general"]["idle_client_in_transaction_timeout"])
current_configs["general"]["idle_client_in_transaction_timeout"] = 0
processes.pgcat.update_config(current_configs) # with timeout 0
processes.pgcat.reload_config
end
it "Allow client to be idle in transaction" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(2)
conn.async_exec("COMMIT")
conn.close
end
end
context "idle transaction timeout set to 500ms" do
before do
current_configs = processes.pgcat.current_config
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
current_configs["general"]["idle_client_in_transaction_timeout"] = 500
processes.pgcat.update_config(current_configs) # with timeout 500
processes.pgcat.reload_config
end
it "Allow client to be idle in transaction below timeout" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(0.4) # below 500ms
conn.async_exec("COMMIT")
conn.close
end
it "Error when client idle in transaction time exceeds timeout" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.async_exec("SELECT 1")
sleep(1) # above 500ms
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
conn.async_exec("SELECT 1") # should be able to send another query
conn.close
end
end
end
end