mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
4 Commits
mostafa_se
...
v1.0.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d5feac4b2 | ||
|
|
90aba9c011 | ||
|
|
0f34b49503 | ||
|
|
ca4431b67e |
@@ -15,6 +15,6 @@ jobs:
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Build CI Docker image
|
||||
run: |
|
||||
docker build . -f Dockerfile.ci --tag ghcr.io/levkk/pgcat-ci:latest
|
||||
docker run ghcr.io/levkk/pgcat-ci:latest
|
||||
docker push ghcr.io/levkk/pgcat-ci:latest
|
||||
docker build . -f Dockerfile.ci --tag ghcr.io/postgresml/pgcat-ci:latest
|
||||
docker run ghcr.io/postgresml/pgcat-ci:latest
|
||||
docker push ghcr.io/postgresml/pgcat-ci:latest
|
||||
|
||||
@@ -49,6 +49,14 @@ default: 30000 # milliseconds
|
||||
|
||||
How long an idle connection with a server is left open (ms).
|
||||
|
||||
### idle_client_in_transaction_timeout
|
||||
```
|
||||
path: general.idle_client_in_transaction_timeout
|
||||
default: 0 # milliseconds
|
||||
```
|
||||
|
||||
How long a client is allowed to be idle while in a transaction (ms).
|
||||
|
||||
### healthcheck_timeout
|
||||
```
|
||||
path: general.healthcheck_timeout
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -716,7 +716,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "0.6.0-alpha1"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "0.6.0-alpha1"
|
||||
version = "1.0.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
2
LICENSE
2
LICENSE
@@ -1,4 +1,4 @@
|
||||
Copyright (c) 2022 Lev Kokotov <lev@levthe.dev>
|
||||
Copyright (c) 2023 PgCat Contributors
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
|
||||
398
README.md
398
README.md
@@ -1,33 +1,47 @@
|
||||
##### PgCat: PostgreSQL at petabyte scale
|
||||
## PgCat: Nextgen PostgreSQL Pooler
|
||||
|
||||
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
||||
[](https://circleci.com/gh/postgresml/pgcat/tree/main)
|
||||
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
|
||||
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
|
||||
</a>
|
||||
|
||||
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
||||
|
||||
**Beta**: looking for beta testers, see [#35](https://github.com/levkk/pgcat/issues/35).
|
||||
PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
|
||||
## Features
|
||||
| **Feature** | **Status** | **Comments** |
|
||||
|--------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :white_check_mark: | Identical to PgBouncer. |
|
||||
| Session pooling | :white_check_mark: | Identical to PgBouncer. |
|
||||
| `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
|
||||
| Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. |
|
||||
| Load balancing of read queries | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
|
||||
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
|
||||
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
|
||||
| Statistics | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. |
|
||||
| Live configuration reloading | :white_check_mark: | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. |
|
||||
| Client authentication | :white_check_mark: :wrench: | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. |
|
||||
| Admin database | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. |
|
||||
|
||||
| **Feature** | **Status** | **Comments** |
|
||||
|-------------|------------|--------------|
|
||||
| Transaction pooling | **Stable** | Identical to PgBouncer with notable improvements for handling bad clients and abandoned transactions. |
|
||||
| Session pooling | **Stable** | Identical to PgBouncer. |
|
||||
| Multi-threaded runtime | **Stable** | Using Tokio asynchronous runtime, the pooler takes advantage of multicore machines. |
|
||||
| Load balancing of read queries | **Stable** | Queries are automatically load balanced between replicas and the primary. |
|
||||
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
|
||||
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
|
||||
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
|
||||
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
|
||||
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
||||
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
||||
| Sharding using extended SQL syntax | **Experimental** | Clients can dynamically configure the pooler to route queries to specific shards. |
|
||||
| Sharding using comments parsing/Regex | **Experimental** | Clients can include shard information (sharding key, shard ID) in the query comments. |
|
||||
| Automatic sharding | **Experimental** | PgCat can parse queries detect sharding keys automatically, and route queries to the route shard. |
|
||||
| Mirroring | **Experimental** | Mirror queries between multiple databases in order to test servers with realistic production traffic. |
|
||||
|
||||
|
||||
## Status
|
||||
|
||||
PgCat is stable and used in production to serve hundreds of thousands of queries per second. Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
|
||||
|
||||
| | |
|
||||
|-|-|
|
||||
|<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f"><img src="./images/instacart.webp" height="70" width="auto"></a>|<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second"><img src="./images/postgresml.webp" height="70" width="auto"></a>|
|
||||
| [Instacart](https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f) | [PostgresML](https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second) |
|
||||
|
||||
## Deployment
|
||||
|
||||
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. That setting can be adjusted to spawn as many (or as little) workers as needed.
|
||||
|
||||
A Docker image is available from `docker pull ghcr.io/postgresml/pgcat:latest`. See our [Github packages repository](https://github.com/postgresml/pgcat/pkgs/container/pgcat).
|
||||
|
||||
For quick local example, use the Docker Compose environment provided:
|
||||
|
||||
```bash
|
||||
@@ -39,9 +53,13 @@ PGPASSWORD=postgres psql -h 127.0.0.1 -p 6432 -U postgres -c 'SELECT 1'
|
||||
|
||||
### Config
|
||||
|
||||
See [Configurations page](https://github.com/levkk/pgcat/blob/main/CONFIG.md)
|
||||
See **[Configuration](https://github.com/levkk/pgcat/blob/main/CONFIG.md)**.
|
||||
|
||||
## Local development
|
||||
## Contributing
|
||||
|
||||
The project is being actively developed and looking for additional contributors and production deployments.
|
||||
|
||||
### Local development
|
||||
|
||||
1. Install Rust (latest stable will work great).
|
||||
2. `cargo build --release` (to get better benchmarks).
|
||||
@@ -51,7 +69,7 @@ See [Configurations page](https://github.com/levkk/pgcat/blob/main/CONFIG.md)
|
||||
|
||||
### Tests
|
||||
|
||||
Quickest way to test your changes is to use pgbench:
|
||||
When making substantial modifications to the protocol implementation, make sure to test them with pgbench:
|
||||
|
||||
```
|
||||
pgbench -i -h 127.0.0.1 -p 6432 && \
|
||||
@@ -61,36 +79,26 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
|
||||
|
||||
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||
|
||||
Run `cargo test` to run Rust tests.
|
||||
Additionally, all features are tested with Ruby, Python, and Rust tests unit and integration tests.
|
||||
|
||||
Run `cargo test` to run Rust unit tests.
|
||||
|
||||
Run the following commands to run Ruby and Python integration tests:
|
||||
|
||||
Run the following commands to run Integration tests locally.
|
||||
```
|
||||
cd tests/docker/
|
||||
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
|
||||
```
|
||||
|
||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||
| Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. |
|
||||
| `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
|
||||
| Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
|
||||
| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. |
|
||||
| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. |
|
||||
| Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
|
||||
| Statistics | :white_check_mark: | :white_check_mark: | Query the admin database with `psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS'`. |
|
||||
| Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. |
|
||||
### Docker-based local development
|
||||
|
||||
### Dev
|
||||
|
||||
Also, you can open a 'dev' environment where you can debug tests easier by running the following command:
|
||||
You can open a Docker development environment where you can debug tests easier. Run the following command to spin it up:
|
||||
|
||||
```
|
||||
./dev/script/console
|
||||
```
|
||||
|
||||
This will open a terminal in an environment similar to that used in tests. In there you can compile, run tests, do some debugging with the test environment, etc. Objects
|
||||
compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have in your host.
|
||||
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -105,11 +113,9 @@ In transaction mode, a client talks to one server for the duration of a single t
|
||||
This mode is enabled by default.
|
||||
|
||||
### Load balancing of read queries
|
||||
All queries are load balanced against the configured servers using the random algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
|
||||
All queries are load balanced against the configured servers using either the random or least open connections algorithms. The most straightforward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
|
||||
|
||||
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
|
||||
|
||||
The query parser is disabled by default.
|
||||
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser, implemented with the `sqlparser` crate, will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
|
||||
|
||||
#### Query parser
|
||||
The query parser will do its best to determine where the query should go, but sometimes that's not possible. In that case, the client can select which server it wants using this custom SQL syntax:
|
||||
@@ -136,38 +142,14 @@ The setting will persist until it's changed again or the client disconnects.
|
||||
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
||||
|
||||
### Failover
|
||||
All servers are checked with a `SELECT 1` query before being given to a client. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||
|
||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||
|
||||
Failover behavior can get pretty interesting (read complex) when multiple configurations and factors are involved. The table below will try to explain what PgCat does in each scenario:
|
||||
|
||||
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|
||||
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the random loop. |
|
||||
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the random loop. |
|
||||
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the random loop. |
|
||||
| Read query | replica | false | false | up | Query is routed to the first replica instance in the random loop. |
|
||||
| Read query | primary | false | false | up | Query is routed to the primary. |
|
||||
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the random loop is attempted. |
|
||||
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
|
||||
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the random loop. |
|
||||
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
|
||||
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
|
||||
| | | | | | |
|
||||
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the random loop. If the instance is a replica, the query fails and the client receives an error. |
|
||||
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
|
||||
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
|
||||
| Write query | primary | false | false | up | The query is routed to the primary. |
|
||||
| Write query | replica | false | false | up | The query is routed to the replica and fails. The client receives an error. |
|
||||
| Write query | unset (any) | true | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| Write query | unset (any) | true | true | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| Write query | primary | false | false | down | The query is routed to the primary and fails. The client receives an error. |
|
||||
| | | | | | |
|
||||
|
||||
### Sharding
|
||||
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
|
||||
|
||||
#### Extended syntax
|
||||
To route queries to a particular shard, we use this custom SQL syntax:
|
||||
|
||||
```sql
|
||||
@@ -182,7 +164,8 @@ The active shard will last until it's changed again or the client disconnects. B
|
||||
|
||||
For hash function implementation, see `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql`.
|
||||
|
||||
#### ActiveRecord/Rails
|
||||
|
||||
##### ActiveRecord/Rails
|
||||
|
||||
```ruby
|
||||
class User < ActiveRecord::Base
|
||||
@@ -210,7 +193,7 @@ User.connection.execute "SET SERVER ROLE TO 'auto'"
|
||||
User.find_by_email("test@example.com")
|
||||
```
|
||||
|
||||
#### Raw SQL
|
||||
##### Raw SQL
|
||||
|
||||
```sql
|
||||
-- Grab a bunch of users from shard 1
|
||||
@@ -230,268 +213,45 @@ SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query sh
|
||||
SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts until set again; we are reading from the primary
|
||||
```
|
||||
|
||||
#### With comments
|
||||
Issuing queries to the pooler can cause additional latency. To reduce its impact, it's possible to include sharding information inside SQL comments sent via the query. This is reasonably easy to implement with ORMs like [ActiveRecord](https://api.rubyonrails.org/classes/ActiveRecord/QueryMethods.html#method-i-annotate) and [SQLAlchemy](https://docs.sqlalchemy.org/en/20/core/events.html#sql-execution-and-connection-events).
|
||||
|
||||
```
|
||||
/* shard_id: 5 */ SELECT * FROM foo WHERE id = 1234;
|
||||
|
||||
/* sharding_key: 1234 */ SELECT * FROM foo WHERE id = 1234;
|
||||
```
|
||||
|
||||
#### Automatic query parsing
|
||||
PgCat can use the `sqlparser` crate to parse SQL queries and extract the sharding key. This is configurable with the `automatic_sharding_key` setting. This feature is still experimental, but it's the ideal implementation for sharding, requiring no client modifications.
|
||||
|
||||
### Statistics reporting
|
||||
|
||||
The stats are very similar to what Pgbouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
|
||||
The stats are very similar to what PgBouncer reports and the names are kept to be comparable. They are accessible by querying the admin database `pgcat`, and `pgbouncer` for compatibility.
|
||||
|
||||
```
|
||||
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
|
||||
```
|
||||
|
||||
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
||||
|
||||
### Live configuration reloading
|
||||
|
||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. Not all settings are currently supported by live reload:
|
||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
||||
|
||||
| **Config** | **Requires restart** |
|
||||
|-------------------------|----------------------|
|
||||
| `host` | yes |
|
||||
| `port` | yes |
|
||||
| `pool_mode` | no |
|
||||
| `connect_timeout` | yes |
|
||||
| `healthcheck_timeout` | no |
|
||||
| `shutdown_timeout` | no |
|
||||
| `healthcheck_delay` | no |
|
||||
| `ban_time` | no |
|
||||
| `user` | yes |
|
||||
| `shards` | yes |
|
||||
| `default_role` | no |
|
||||
| `primary_reads_enabled` | no |
|
||||
| `query_parser_enabled` | no |
|
||||
### Mirroring
|
||||
|
||||
Mirroring allows to route queries to multiple databases at the same time. This is useful for prewarning replicas before placing them into the active configuration, or for testing different versions of Postgres with live traffic.
|
||||
|
||||
## Benchmarks
|
||||
## License
|
||||
|
||||
You can setup PgBench locally through PgCat:
|
||||
PgCat is free and open source, released under the MIT license.
|
||||
|
||||
```
|
||||
pgbench -h 127.0.0.1 -p 6432 -i
|
||||
```
|
||||
## Contributors
|
||||
|
||||
Coincidenly, this uses `COPY` so you can test if that works. Additionally, we'll be running the following PgBench configurations:
|
||||
Many thanks to our amazing contributors!
|
||||
|
||||
1. 16 clients, 2 threads
|
||||
2. 32 clients, 2 threads
|
||||
3. 64 clients, 2 threads
|
||||
4. 128 clients, 2 threads
|
||||
<a href = "https://github.com/postgresml/pgcat/graphs/contributors">
|
||||
<img src = "https://contrib.rocks/image?repo=postgresml/pgcat"/>
|
||||
</a>
|
||||
|
||||
All queries will be `SELECT` only (`-S`) just so disks don't get in the way, since the dataset will be effectively all in RAM.
|
||||
|
||||
My setup:
|
||||
|
||||
- 8 cores, 16 hyperthreaded (AMD Ryzen 5800X)
|
||||
- 32GB RAM (doesn't matter for this benchmark, except to prove that Postgres will fit the whole dataset into RAM)
|
||||
|
||||
### PgBouncer
|
||||
|
||||
#### Config
|
||||
|
||||
```ini
|
||||
[databases]
|
||||
shard0 = host=localhost port=5432 user=sharding_user password=sharding_user
|
||||
|
||||
[pgbouncer]
|
||||
pool_mode = transaction
|
||||
max_client_conn = 1000
|
||||
```
|
||||
|
||||
Everything else stays default.
|
||||
|
||||
#### Runs
|
||||
|
||||
|
||||
```
|
||||
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 16
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 16000/16000
|
||||
latency average = 0.155 ms
|
||||
tps = 103417.377469 (including connections establishing)
|
||||
tps = 103510.639935 (excluding connections establishing)
|
||||
|
||||
|
||||
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 32
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 32000/32000
|
||||
latency average = 0.290 ms
|
||||
tps = 110325.939785 (including connections establishing)
|
||||
tps = 110386.513435 (excluding connections establishing)
|
||||
|
||||
|
||||
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 64
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 64000/64000
|
||||
latency average = 0.692 ms
|
||||
tps = 92470.427412 (including connections establishing)
|
||||
tps = 92618.389350 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 128
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 128000/128000
|
||||
latency average = 1.406 ms
|
||||
tps = 91013.429985 (including connections establishing)
|
||||
tps = 91067.583928 (excluding connections establishing)
|
||||
```
|
||||
|
||||
### PgCat
|
||||
|
||||
#### Config
|
||||
|
||||
The only thing that matters here is the number of workers in the Tokio pool. Make sure to set it to < than the number of your CPU cores.
|
||||
Also account for hyper-threading, so if you have that, take the number you got above and divide it by two, that way only "real" cores serving
|
||||
requests.
|
||||
|
||||
My setup is 16 threads, 8 cores (`htop` shows as 16 CPUs), so I set the `max_workers` in Tokio to 4. Too many, and it starts conflicting with PgBench
|
||||
which is also running on the same system.
|
||||
|
||||
#### Runs
|
||||
|
||||
|
||||
```
|
||||
$ pgbench -t 1000 -c 16 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 16
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 16000/16000
|
||||
latency average = 0.164 ms
|
||||
tps = 97705.088232 (including connections establishing)
|
||||
tps = 97872.216045 (excluding connections establishing)
|
||||
|
||||
|
||||
$ pgbench -t 1000 -c 32 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 32
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 32000/32000
|
||||
latency average = 0.288 ms
|
||||
tps = 111300.488119 (including connections establishing)
|
||||
tps = 111413.107800 (excluding connections establishing)
|
||||
|
||||
|
||||
$ pgbench -t 1000 -c 64 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 64
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 64000/64000
|
||||
latency average = 0.556 ms
|
||||
tps = 115190.496139 (including connections establishing)
|
||||
tps = 115247.521295 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 128 -j 2 -p 6432 -h 127.0.0.1 -S --protocol extended
|
||||
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 128
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 128000/128000
|
||||
latency average = 1.135 ms
|
||||
tps = 112770.562239 (including connections establishing)
|
||||
tps = 112796.502381 (excluding connections establishing)
|
||||
```
|
||||
|
||||
### Direct Postgres
|
||||
|
||||
Always good to have a base line.
|
||||
|
||||
#### Runs
|
||||
|
||||
```
|
||||
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 16
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 16000/16000
|
||||
latency average = 0.115 ms
|
||||
tps = 139443.955722 (including connections establishing)
|
||||
tps = 142314.859075 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 32
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 32000/32000
|
||||
latency average = 0.212 ms
|
||||
tps = 150644.840891 (including connections establishing)
|
||||
tps = 152218.499430 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 64
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 64000/64000
|
||||
latency average = 0.420 ms
|
||||
tps = 152517.663404 (including connections establishing)
|
||||
tps = 153319.188482 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
query mode: extended
|
||||
number of clients: 128
|
||||
number of threads: 2
|
||||
number of transactions per client: 1000
|
||||
number of transactions actually processed: 128000/128000
|
||||
latency average = 0.854 ms
|
||||
tps = 149818.594087 (including connections establishing)
|
||||
tps = 150200.603049 (excluding connections establishing)
|
||||
```
|
||||
|
||||
BIN
images/instacart.webp
Normal file
BIN
images/instacart.webp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 3.4 KiB |
BIN
images/postgresml.webp
Normal file
BIN
images/postgresml.webp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.7 KiB |
@@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds
|
||||
# How long an idle connection with a server is left open (ms).
|
||||
idle_timeout = 30000 # milliseconds
|
||||
|
||||
# How long a client is allowed to be idle while in a transaction (ms).
|
||||
idle_client_in_transaction_timeout = 0 # milliseconds
|
||||
|
||||
# How much time to give the health check query to return with a result (ms).
|
||||
healthcheck_timeout = 1000 # milliseconds
|
||||
|
||||
|
||||
793
src/client.rs
793
src/client.rs
@@ -1,15 +1,10 @@
|
||||
use crate::errors::Error;
|
||||
use crate::pool::BanReason;
|
||||
use bb8::PooledConnection;
|
||||
/// Handle clients by pretending to be a PostgreSQL server.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use hyper::server::conn;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use crate::pool::ServerPool;
|
||||
use crate::config::Role;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Add;
|
||||
use std::time::Instant;
|
||||
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
||||
use tokio::net::TcpStream;
|
||||
@@ -17,7 +12,7 @@ use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::admin::{generate_server_info_for_admin, handle_admin};
|
||||
use crate::config::{get_config, Address, PoolMode};
|
||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||
use crate::constants::*;
|
||||
|
||||
use crate::messages::*;
|
||||
@@ -36,19 +31,6 @@ enum ClientConnectionType {
|
||||
CancelQuery,
|
||||
}
|
||||
|
||||
struct RetryBuffer {
|
||||
buffer: BytesMut,
|
||||
retry_count: u32,
|
||||
}
|
||||
|
||||
pub enum ClientFlowControl {
|
||||
Retry,
|
||||
PerformNextCommand,
|
||||
ReleaseConnection,
|
||||
Disconnect
|
||||
}
|
||||
|
||||
|
||||
/// The client state. One of these is created per client.
|
||||
pub struct Client<S, T> {
|
||||
/// The reads are buffered (8K by default).
|
||||
@@ -110,8 +92,6 @@ pub struct Client<S, T> {
|
||||
|
||||
/// Used to notify clients about an impending shutdown
|
||||
shutdown: Receiver<()>,
|
||||
|
||||
retry_buffer: Option<RetryBuffer>,
|
||||
}
|
||||
|
||||
/// Client entrypoint.
|
||||
@@ -578,7 +558,6 @@ where
|
||||
application_name: application_name.to_string(),
|
||||
shutdown,
|
||||
connected_to_server: false,
|
||||
retry_buffer: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -613,184 +592,155 @@ where
|
||||
application_name: String::from("undefined"),
|
||||
shutdown,
|
||||
connected_to_server: false,
|
||||
retry_buffer: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Handle a connected and authenticated client.
|
||||
pub async fn handle(&mut self) -> Result<(), Error> {
|
||||
// The client wants to cancel a query it has issued previously.
|
||||
if self.cancel_mode {
|
||||
trace!("Sending CancelRequest");
|
||||
|
||||
async fn cancel_query(&mut self) -> Result<(), Error> {
|
||||
trace!("Sending CancelRequest");
|
||||
let (process_id, secret_key, address, port) = {
|
||||
let guard = self.client_server_map.lock();
|
||||
|
||||
let (process_id, secret_key, address, port) = {
|
||||
let guard = self.client_server_map.lock();
|
||||
match guard.get(&(self.process_id, self.secret_key)) {
|
||||
// Drop the mutex as soon as possible.
|
||||
// We found the server the client is using for its query
|
||||
// that it wants to cancel.
|
||||
Some((process_id, secret_key, address, port)) => {
|
||||
(*process_id, *secret_key, address.clone(), *port)
|
||||
}
|
||||
|
||||
match guard.get(&(self.process_id, self.secret_key)) {
|
||||
// Drop the mutex as soon as possible.
|
||||
// We found the server the client is using for its query
|
||||
// that it wants to cancel.
|
||||
Some((process_id, secret_key, address, port)) => {
|
||||
(*process_id, *secret_key, address.clone(), *port)
|
||||
// The client doesn't know / got the wrong server,
|
||||
// we're closing the connection for security reasons.
|
||||
None => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
// The client doesn't know / got the wrong server,
|
||||
// we're closing the connection for security reasons.
|
||||
None => return Ok(()),
|
||||
}
|
||||
};
|
||||
// Opens a new separate connection to the server, sends the backend_id
|
||||
// and secret_key and then closes it for security reasons. No other interactions
|
||||
// take place.
|
||||
return Server::cancel(&address, port, process_id, secret_key).await;
|
||||
}
|
||||
|
||||
// Opens a new separate connection to the server, sends the backend_id
|
||||
// and secret_key and then closes it for security reasons. No other interactions
|
||||
// take place.
|
||||
return Server::cancel(&address, port, process_id, secret_key).await;
|
||||
}
|
||||
|
||||
async fn checkout_connection(&mut self, pool: &ConnectionPool, query_router: &mut QueryRouter) -> Result<(PooledConnection<ServerPool>, Address), Error> {
|
||||
// Grab a server from the pool.
|
||||
let mut connection = match pool
|
||||
.get(query_router.shard(), query_router.role(), self.process_id)
|
||||
.await
|
||||
{
|
||||
Ok(conn) => {
|
||||
debug!("Got connection from pool");
|
||||
conn
|
||||
}
|
||||
Err(err) => {
|
||||
self.buffer.clear();
|
||||
error_response(&mut self.write, "could not get connection from the pool")
|
||||
.await?;
|
||||
|
||||
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
||||
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let server = &mut connection.0;
|
||||
let address = connection.1.clone();
|
||||
|
||||
// Server is assigned to the client in case the client wants to
|
||||
// cancel a query later.
|
||||
server.claim(self.process_id, self.secret_key);
|
||||
self.connected_to_server = true;
|
||||
|
||||
// Update statistics
|
||||
self.stats
|
||||
.client_active(self.process_id, server.server_id());
|
||||
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.server_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
self.addr,
|
||||
server.address()
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
self.stats.client_register(
|
||||
self.process_id,
|
||||
self.pool_name.clone(),
|
||||
self.username.clone(),
|
||||
self.application_name.clone(),
|
||||
);
|
||||
|
||||
return Ok(connection);
|
||||
}
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
// or issue commands for our sharding and server selection protocol.
|
||||
loop {
|
||||
trace!(
|
||||
"Client idle, waiting for message, transaction mode: {}",
|
||||
self.transaction_mode
|
||||
);
|
||||
|
||||
async fn client_proc(&mut self, query_router: &mut QueryRouter) -> Result<ClientFlowControl, Error> {
|
||||
// Read a complete message from the client, which normally would be
|
||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||
// We can parse it here before grabbing a server from the pool,
|
||||
// in case the client is sending some custom protocol messages, e.g.
|
||||
// SET SHARDING KEY TO 'bigint';
|
||||
|
||||
trace!(
|
||||
"Client idle, waiting for message, transaction mode: {}",
|
||||
self.transaction_mode
|
||||
);
|
||||
let message = tokio::select! {
|
||||
_ = self.shutdown.recv() => {
|
||||
if !self.admin {
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
"terminating connection due to administrator command"
|
||||
).await?;
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Read a complete message from the client, which normally would be
|
||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||
// We can parse it here before grabbing a server from the pool,
|
||||
// in case the client is sending some custom protocol messages, e.g.
|
||||
// SET SHARDING KEY TO 'bigint';
|
||||
let message = tokio::select! {
|
||||
_ = self.shutdown.recv() => {
|
||||
if !self.admin {
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
"terminating connection due to administrator command"
|
||||
).await?;
|
||||
return Ok(ClientFlowControl::Disconnect)
|
||||
} else {
|
||||
// Admin clients ignore shutdown.
|
||||
read_message(&mut self.read).await?
|
||||
}
|
||||
},
|
||||
else {
|
||||
read_message(&mut self.read).await?
|
||||
}
|
||||
},
|
||||
message_result = read_message(&mut self.read) => message_result?
|
||||
};
|
||||
|
||||
message_result = read_message(&mut self.read) => message_result?
|
||||
};
|
||||
|
||||
match message[0] as char {
|
||||
// Buffer extended protocol messages even if we do not have
|
||||
// a server connection yet. Hopefully, when we get the S message
|
||||
// we'll be able to allocate a connection. Also, clients do not expect
|
||||
// the server to respond to these messages so even if we were not able to
|
||||
// allocate a connection, we wouldn't be able to send back an error message
|
||||
// to the client so we buffer them and defer the decision to error out or not
|
||||
// to when we get the S message
|
||||
'D' | 'E' => {
|
||||
self.buffer.put(&message[..]);
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
}
|
||||
|
||||
'Q' => {
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer(&message);
|
||||
}
|
||||
}
|
||||
|
||||
'P' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer(&message);
|
||||
match message[0] as char {
|
||||
// Buffer extended protocol messages even if we do not have
|
||||
// a server connection yet. Hopefully, when we get the S message
|
||||
// we'll be able to allocate a connection. Also, clients do not expect
|
||||
// the server to respond to these messages so even if we were not able to
|
||||
// allocate a connection, we wouldn't be able to send back an error message
|
||||
// to the client so we buffer them and defer the decision to error out or not
|
||||
// to when we get the S message
|
||||
'D' | 'E' => {
|
||||
self.buffer.put(&message[..]);
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
}
|
||||
|
||||
'B' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer_shard_from_bind(&message);
|
||||
'Q' => {
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer(&message);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
'P' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer(&message);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
'B' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer_shard_from_bind(&message);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
'X' => {
|
||||
debug!("Client disconnecting");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
|
||||
'X' => {
|
||||
debug!("Client disconnecting");
|
||||
return Ok(ClientFlowControl::Disconnect);
|
||||
// Handle admin database queries.
|
||||
if self.admin {
|
||||
debug!("Handling admin command");
|
||||
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
// Get a pool instance referenced by the most up-to-date
|
||||
// pointer. This ensures we always read the latest config
|
||||
// when starting a query.
|
||||
let mut pool = self.get_pool().await?;
|
||||
|
||||
// Handle admin database queries.
|
||||
if self.admin {
|
||||
debug!("Handling admin command");
|
||||
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
}
|
||||
// Check if the pool is paused and wait until it's resumed.
|
||||
if pool.wait_paused().await {
|
||||
// Refresh pool information, something might have changed.
|
||||
pool = self.get_pool().await?;
|
||||
}
|
||||
|
||||
// Get a pool instance referenced by the most up-to-date
|
||||
// pointer. This ensures we always read the latest config
|
||||
// when starting a query.
|
||||
let mut pool = self.get_pool().await?;
|
||||
query_router.update_pool_settings(pool.settings.clone());
|
||||
|
||||
// Check if the pool is paused and wait until it's resumed.
|
||||
if pool.wait_paused().await {
|
||||
// Refresh pool information, something might have changed.
|
||||
pool = self.get_pool().await?;
|
||||
}
|
||||
let current_shard = query_router.shard();
|
||||
|
||||
query_router.update_pool_settings(pool.settings.clone());
|
||||
// Handle all custom protocol commands, if any.
|
||||
match query_router.try_execute_command(&message) {
|
||||
// Normal query, not a custom command.
|
||||
None => (),
|
||||
|
||||
let current_shard = query_router.shard();
|
||||
|
||||
// Handle all custom protocol commands, if any.
|
||||
match query_router.try_execute_command(&message) {
|
||||
// Normal query, not a custom command.
|
||||
None => (),
|
||||
// SET SHARD TO
|
||||
Some((Command::SetShard, _)) => {
|
||||
// Selected shard is not configured.
|
||||
@@ -811,48 +761,97 @@ where
|
||||
} else {
|
||||
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
|
||||
}
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SET PRIMARY READS TO
|
||||
Some((Command::SetPrimaryReads, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SET SHARDING KEY TO
|
||||
Some((Command::SetShardingKey, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SET SERVER ROLE TO
|
||||
Some((Command::SetServerRole, _)) => {
|
||||
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SHOW SERVER ROLE
|
||||
Some((Command::ShowServerRole, value)) => {
|
||||
show_response(&mut self.write, "server role", &value).await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SHOW SHARD
|
||||
Some((Command::ShowShard, value)) => {
|
||||
show_response(&mut self.write, "shard", &value).await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
// SHOW PRIMARY READS
|
||||
Some((Command::ShowPrimaryReads, value)) => {
|
||||
show_response(&mut self.write, "primary reads", &value).await?;
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Waiting for connection from pool");
|
||||
|
||||
// Grab a server from the pool.
|
||||
let connection = match pool
|
||||
.get(query_router.shard(), query_router.role(), self.process_id)
|
||||
.await
|
||||
{
|
||||
Ok(conn) => {
|
||||
debug!("Got connection from pool");
|
||||
conn
|
||||
}
|
||||
Err(err) => {
|
||||
// Client is attempting to get results from the server,
|
||||
// but we were unable to grab a connection from the pool
|
||||
// We'll send back an error message and clean the extended
|
||||
// protocol buffer
|
||||
if message[0] as char == 'S' {
|
||||
error!("Got Sync message but failed to get a connection from the pool");
|
||||
self.buffer.clear();
|
||||
}
|
||||
error_response(&mut self.write, "could not get connection from the pool")
|
||||
.await?;
|
||||
|
||||
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
||||
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut reference = connection.0;
|
||||
let address = connection.1;
|
||||
let server = &mut *reference;
|
||||
|
||||
// Server is assigned to the client in case the client wants to
|
||||
// cancel a query later.
|
||||
server.claim(self.process_id, self.secret_key);
|
||||
self.connected_to_server = true;
|
||||
|
||||
// Update statistics
|
||||
self.stats
|
||||
.client_active(self.process_id, server.server_id());
|
||||
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.server_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
self.addr,
|
||||
server.address()
|
||||
);
|
||||
|
||||
// TODO: investigate other parameters and set them too.
|
||||
|
||||
// Set application_name.
|
||||
@@ -860,6 +859,11 @@ where
|
||||
|
||||
let mut initial_message = Some(message);
|
||||
|
||||
let idle_client_timeout_duration = match get_idle_client_in_transaction_timeout() {
|
||||
0 => tokio::time::Duration::MAX,
|
||||
timeout => tokio::time::Duration::from_millis(timeout),
|
||||
};
|
||||
|
||||
// Transaction loop. Multiple queries can be issued by the client here.
|
||||
// The connection belongs to the client until the transaction is over,
|
||||
// or until the client disconnects if we are in session mode.
|
||||
@@ -867,19 +871,189 @@ where
|
||||
// If the client is in session mode, no more custom protocol
|
||||
// commands will be accepted.
|
||||
loop {
|
||||
match self.tx_proc(server, &pool, initial_message).await {
|
||||
Ok(control_flow) => {
|
||||
match control_flow {
|
||||
ClientFlowControl::PerformNextCommand => {
|
||||
initial_message = None;
|
||||
continue;
|
||||
let message = match initial_message {
|
||||
None => {
|
||||
trace!("Waiting for message inside transaction or in session mode");
|
||||
|
||||
match tokio::time::timeout(
|
||||
idle_client_timeout_duration,
|
||||
read_message(&mut self.read),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(message)) => message,
|
||||
Ok(Err(err)) => {
|
||||
// Client disconnected inside a transaction.
|
||||
// Clean up the server and re-use it.
|
||||
server.checkin_cleanup().await?;
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
control_flow_result => {
|
||||
return Ok(control_flow_result);
|
||||
Err(_) => {
|
||||
// Client idle in transaction timeout
|
||||
error_response(&mut self.write, "idle transaction timeout").await?;
|
||||
error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role());
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
Some(message) => {
|
||||
initial_message = None;
|
||||
message
|
||||
}
|
||||
};
|
||||
|
||||
// The message will be forwarded to the server intact. We still would like to
|
||||
// parse it below to figure out what to do with it.
|
||||
|
||||
// Safe to unwrap because we know this message has a certain length and has the code
|
||||
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
||||
let code = *message.get(0).unwrap() as char;
|
||||
|
||||
trace!("Message: {}", code);
|
||||
|
||||
match code {
|
||||
// Query
|
||||
'Q' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.send_and_receive_loop(code, Some(&message), server, &address, &pool)
|
||||
.await?;
|
||||
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate
|
||||
'X' => {
|
||||
server.checkin_cleanup().await?;
|
||||
self.release();
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Parse
|
||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||
'P' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Bind
|
||||
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
||||
'B' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Describe
|
||||
// Command a client can issue to describe a previously prepared named statement.
|
||||
'D' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Execute
|
||||
// Execute a prepared statement prepared in `P` and bound in `B`.
|
||||
'E' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Sync
|
||||
// Frontend (client) is asking for the query result now.
|
||||
'S' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||
|
||||
// Almost certainly true
|
||||
if first_message_code == 'P' {
|
||||
// Message layout
|
||||
// P followed by 32 int followed by null-terminated statement name
|
||||
// So message code should be in offset 0 of the buffer, first character
|
||||
// in prepared statement name would be index 5
|
||||
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
|
||||
if first_char_in_name != 0 {
|
||||
// This is a named prepared statement
|
||||
// Server connection state will need to be cleared at checkin
|
||||
server.mark_dirty();
|
||||
}
|
||||
}
|
||||
|
||||
self.send_and_receive_loop(code, None, server, &address, &pool)
|
||||
.await?;
|
||||
|
||||
self.buffer.clear();
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CopyData
|
||||
'd' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
// Want to limit buffer size
|
||||
if self.buffer.len() > 8196 {
|
||||
// Forward the data to the server,
|
||||
self.send_server_message(server, &self.buffer, &address, &pool)
|
||||
.await?;
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// CopyDone or CopyFail
|
||||
// Copy is done, successfully or not.
|
||||
'c' | 'f' => {
|
||||
// We may already have some copy data in the buffer, add this message to buffer
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
self.send_server_message(server, &self.buffer, &address, &pool)
|
||||
.await?;
|
||||
|
||||
// Clear the buffer
|
||||
self.buffer.clear();
|
||||
|
||||
let response = self.receive_server_message(server, &address, &pool).await?;
|
||||
|
||||
match write_all_half(&mut self.write, &response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Some unexpected message. We either did not implement the protocol correctly
|
||||
// or this is not a Postgres client we're talking to.
|
||||
_ => {
|
||||
error!("Unexpected code: {}", code);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -892,249 +1066,6 @@ where
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id);
|
||||
}
|
||||
|
||||
|
||||
#[inline(always)]
|
||||
pub async fn tx_proc(&mut self, server: &mut Server, pool: &ConnectionPool, initial_message: Option<BytesMut> ) -> Result<ClientFlowControl, Error> {
|
||||
let message = match initial_message {
|
||||
None => {
|
||||
trace!("Waiting for message inside transaction or in session mode");
|
||||
|
||||
match read_message(&mut self.read).await {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
// Client disconnected inside a transaction.
|
||||
// Clean up the server and re-use it.
|
||||
server.checkin_cleanup().await?;
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(message) => message
|
||||
};
|
||||
|
||||
// The message will be forwarded to the server intact. We still would like to
|
||||
// parse it below to figure out what to do with it.
|
||||
|
||||
// Safe to unwrap because we know this message has a certain length and has the code
|
||||
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
||||
let code = *message.get(0).unwrap() as char;
|
||||
let address = server.address();
|
||||
trace!("Message: {}", code);
|
||||
|
||||
match code {
|
||||
// Query
|
||||
'Q' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
match self.send_and_receive_loop(code, Some(&message), server, &address, &pool).await {
|
||||
Ok(_) => self.retry_buffer = None,
|
||||
Err(_) => {
|
||||
if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica {
|
||||
match self.retry_buffer {
|
||||
Some(ref mut retry_buffer) => {
|
||||
if retry_buffer.retry_count < 3 {
|
||||
retry_buffer.retry_count += 1;
|
||||
return Ok(ClientFlowControl::Retry);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 });
|
||||
return Ok(ClientFlowControl::Retry);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
return Ok(ClientFlowControl::ReleaseConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate
|
||||
'X' => {
|
||||
server.checkin_cleanup().await?;
|
||||
self.release();
|
||||
|
||||
return Ok(ClientFlowControl::Disconnect);
|
||||
}
|
||||
|
||||
// Parse
|
||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||
'P' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Bind
|
||||
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
||||
'B' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Describe
|
||||
// Command a client can issue to describe a previously prepared named statement.
|
||||
'D' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Execute
|
||||
// Execute a prepared statement prepared in `P` and bound in `B`.
|
||||
'E' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Sync
|
||||
// Frontend (client) is asking for the query result now.
|
||||
'S' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||
|
||||
// Almost certainly true
|
||||
if first_message_code == 'P' {
|
||||
// Message layout
|
||||
// P followed by 32 int followed by null-terminated statement name
|
||||
// So message code should be in offset 0 of the buffer, first character
|
||||
// in prepared statement name would be index 5
|
||||
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
|
||||
if first_char_in_name != 0 {
|
||||
// This is a named prepared statement
|
||||
// Server connection state will need to be cleared at checkin
|
||||
server.mark_dirty();
|
||||
}
|
||||
}
|
||||
|
||||
match self.send_and_receive_loop(code, None, server, &address, &pool).await {
|
||||
Ok(_) => self.retry_buffer = None,
|
||||
Err(err) => {
|
||||
if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica {
|
||||
match self.retry_buffer {
|
||||
Some(ref mut retry_buffer) => {
|
||||
if retry_buffer.retry_count < 3 {
|
||||
retry_buffer.retry_count += 1;
|
||||
return Ok(ClientFlowControl::Retry);
|
||||
}
|
||||
self.retry_buffer = None;
|
||||
return Err(err);
|
||||
},
|
||||
None => {
|
||||
let buffer = self.buffer.clone();
|
||||
self.buffer.clear();
|
||||
self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 });
|
||||
return Ok(ClientFlowControl::Retry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.buffer.clear();
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
return Ok(ClientFlowControl::ReleaseConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CopyData
|
||||
'd' => {
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
// Want to limit buffer size
|
||||
if self.buffer.len() > 8196 {
|
||||
// Forward the data to the server,
|
||||
self.send_server_message(server, &self.buffer, &address, &pool)
|
||||
.await?;
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// CopyDone or CopyFail
|
||||
// Copy is done, successfully or not.
|
||||
'c' | 'f' => {
|
||||
// We may already have some copy data in the buffer, add this message to buffer
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
self.send_server_message(server, &self.buffer, &address, &pool)
|
||||
.await?;
|
||||
|
||||
// Clear the buffer
|
||||
self.buffer.clear();
|
||||
|
||||
let response = self.receive_server_message(server, &address, &pool).await?;
|
||||
|
||||
match write_all_half(&mut self.write, &response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
return Ok(ClientFlowControl::ReleaseConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Some unexpected message. We either did not implement the protocol correctly
|
||||
// or this is not a Postgres client we're talking to.
|
||||
_ => {
|
||||
return Err(Error::ProtocolSyncError("bad message code".to_string()));
|
||||
}
|
||||
}
|
||||
return Ok(ClientFlowControl::PerformNextCommand);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Handle a connected and authenticated client.
|
||||
pub async fn handle(&mut self) -> Result<(), Error> {
|
||||
// The client wants to cancel a query it has issued previously.
|
||||
if self.cancel_mode {
|
||||
return self.cancel_query().await;
|
||||
}
|
||||
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
self.stats.client_register(
|
||||
self.process_id,
|
||||
self.pool_name.clone(),
|
||||
self.username.clone(),
|
||||
self.application_name.clone(),
|
||||
);
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
// or issue commands for our sharding and server selection protocol.
|
||||
|
||||
loop {
|
||||
|
||||
self.client_proc(&mut query_router).await?;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve connection pool, if it exists.
|
||||
|
||||
@@ -197,6 +197,9 @@ pub struct General {
|
||||
#[serde(default = "General::default_ban_time")]
|
||||
pub ban_time: i64,
|
||||
|
||||
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
|
||||
pub idle_client_in_transaction_timeout: u64,
|
||||
|
||||
#[serde(default = "General::default_worker_threads")]
|
||||
pub worker_threads: usize,
|
||||
|
||||
@@ -260,6 +263,10 @@ impl General {
|
||||
pub fn default_worker_threads() -> usize {
|
||||
4
|
||||
}
|
||||
|
||||
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for General {
|
||||
@@ -276,6 +283,7 @@ impl Default for General {
|
||||
healthcheck_delay: Self::default_healthcheck_delay(),
|
||||
ban_time: Self::default_ban_time(),
|
||||
worker_threads: Self::default_worker_threads(),
|
||||
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
|
||||
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
|
||||
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
|
||||
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
|
||||
@@ -655,6 +663,13 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
config.general.healthcheck_delay.to_string(),
|
||||
),
|
||||
("ban_time".to_string(), config.general.ban_time.to_string()),
|
||||
(
|
||||
"idle_client_in_transaction_timeout".to_string(),
|
||||
config
|
||||
.general
|
||||
.idle_client_in_transaction_timeout
|
||||
.to_string(),
|
||||
),
|
||||
];
|
||||
|
||||
r.append(&mut static_settings);
|
||||
@@ -666,6 +681,10 @@ impl Config {
|
||||
/// Print current configuration.
|
||||
pub fn show(&self) {
|
||||
info!("Ban time: {}s", self.general.ban_time);
|
||||
info!(
|
||||
"Idle client in transaction timeout: {}ms",
|
||||
self.general.idle_client_in_transaction_timeout
|
||||
);
|
||||
info!("Worker threads: {}", self.general.worker_threads);
|
||||
info!(
|
||||
"Healthcheck timeout: {}ms",
|
||||
@@ -819,6 +838,12 @@ pub fn get_config() -> Config {
|
||||
(*(*CONFIG.load())).clone()
|
||||
}
|
||||
|
||||
pub fn get_idle_client_in_transaction_timeout() -> u64 {
|
||||
(*(*CONFIG.load()))
|
||||
.general
|
||||
.idle_client_in_transaction_timeout
|
||||
}
|
||||
|
||||
/// Parse the configuration file located at the path.
|
||||
pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
let mut contents = String::new();
|
||||
@@ -889,6 +914,7 @@ mod test {
|
||||
assert_eq!(get_config().path, "pgcat.toml".to_string());
|
||||
|
||||
assert_eq!(get_config().general.ban_time, 60);
|
||||
assert_eq!(get_config().general.idle_client_in_transaction_timeout, 0);
|
||||
assert_eq!(get_config().general.idle_timeout, 30000);
|
||||
assert_eq!(get_config().pools.len(), 2);
|
||||
assert_eq!(get_config().pools["sharded_db"].shards.len(), 3);
|
||||
|
||||
@@ -309,4 +309,58 @@ describe "Miscellaneous" do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Idle client timeout" do
|
||||
context "idle transaction timeout set to 0" do
|
||||
before do
|
||||
current_configs = processes.pgcat.current_config
|
||||
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
|
||||
puts(current_configs["general"]["idle_client_in_transaction_timeout"])
|
||||
|
||||
current_configs["general"]["idle_client_in_transaction_timeout"] = 0
|
||||
|
||||
processes.pgcat.update_config(current_configs) # with timeout 0
|
||||
processes.pgcat.reload_config
|
||||
end
|
||||
|
||||
it "Allow client to be idle in transaction" do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("BEGIN")
|
||||
conn.async_exec("SELECT 1")
|
||||
sleep(2)
|
||||
conn.async_exec("COMMIT")
|
||||
conn.close
|
||||
end
|
||||
end
|
||||
|
||||
context "idle transaction timeout set to 500ms" do
|
||||
before do
|
||||
current_configs = processes.pgcat.current_config
|
||||
correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
|
||||
current_configs["general"]["idle_client_in_transaction_timeout"] = 500
|
||||
|
||||
processes.pgcat.update_config(current_configs) # with timeout 500
|
||||
processes.pgcat.reload_config
|
||||
end
|
||||
|
||||
it "Allow client to be idle in transaction below timeout" do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("BEGIN")
|
||||
conn.async_exec("SELECT 1")
|
||||
sleep(0.4) # below 500ms
|
||||
conn.async_exec("COMMIT")
|
||||
conn.close
|
||||
end
|
||||
|
||||
it "Error when client idle in transaction time exceeds timeout" do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("BEGIN")
|
||||
conn.async_exec("SELECT 1")
|
||||
sleep(1) # above 500ms
|
||||
expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
|
||||
conn.async_exec("SELECT 1") # should be able to send another query
|
||||
conn.close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user