Compare commits

..

16 Commits

Author SHA1 Message Date
Mostafa
e299a2e71a fmt 2025-02-27 07:26:17 -06:00
Mostafa
00ac44427f clippy 2025-02-27 07:24:23 -06:00
Mostafa
147eba52c0 Add checkout_failure_limit config/feature 2025-02-27 07:17:10 -06:00
Alex Kesling
f8e2fcd0ed s/Iniitalize/Initialize/ (#897) 2025-01-09 11:59:42 -08:00
Nadav Shatz
3202f5685b Add DB activity based routing (#864) 2024-12-22 05:23:57 -06:00
Gabriel Simmer
b37d105184 chore(deps): bump sqlparser from 0.41.0 to 0.52.0 (#870)
* chore(deps): bump sqlparser from 0.41.0 to 0.52.0

Bumps [sqlparser](https://github.com/apache/datafusion-sqlparser-rs) from 0.41.0 to 0.52.0.
- [Changelog](https://github.com/apache/datafusion-sqlparser-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/apache/datafusion-sqlparser-rs/commits)

---
updated-dependencies:
- dependency-name: sqlparser
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* bump

* Update to latest sqlparser version

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Mostafa <mostafa.mohmmed@gmail.com>
2024-11-23 07:25:37 -06:00
Vitalii Tverdokhlib
32f4752daf DOCS: small typo LIMT (Update README.md) (#866) 2024-11-17 06:51:05 -06:00
Mostafa
3796e26402 Fix contact info for Helm chart (#861)
* Fix contact info for Helm chart
2024-11-11 09:24:17 -06:00
Mostafa
0ee59c0c40 Another no-op helm release (#853) 2024-11-08 06:07:12 -06:00
Mostafa
b61d2cc6f0 Use main branch for helm chart releases (#852) 2024-11-08 06:04:42 -06:00
Jose Fernández
c11418c083 Revert "Do not unban replicas if a primary is available" (#850)
Revert "Do not unban replicas if a primary is available (#843)"

This reverts commit cdcfa99fb9.
2024-11-07 22:00:43 +01:00
Jose Fernández
c9544bdff2 Fix default_role being ignored when query_parser_enabled was false (#847)
Fix default_role being ignored when query_parser_enabled was false
2024-11-07 11:11:49 -06:00
Jose Fernández
cdcfa99fb9 Do not unban replicas if a primary is available (#843)
Add `unban_replicas_when_all_banned` to control unbanning replicas behavior.
2024-11-07 11:11:11 -06:00
Víťa Tauer
f27dc6b483 Fixing invalid setting name in pgcat.toml (#849) 2024-11-07 06:17:09 -06:00
Mostafa
326efc22b3 Another no-op release for helm (#845)
Another no-op release
2024-11-02 18:05:41 -05:00
Mostafa
01c6afb2e5 Attempt a helm chart release (#844)
Attempt a release

Co-authored-by: Mostafa <no_reply@github.com>
2024-11-02 11:55:18 -05:00
17 changed files with 994 additions and 89 deletions

View File

@@ -22,7 +22,7 @@ jobs:
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and # Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python # yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5.3.0 uses: actions/setup-python@v5.1.0
with: with:
python-version: 3.7 python-version: 3.7

View File

@@ -298,6 +298,19 @@ Load balancing mode
`random` selects the server at random `random` selects the server at random
`loc` selects the server with the least outstanding busy connections `loc` selects the server with the least outstanding busy connections
### checkout_failure_limit
```
path: pools.<pool_name>.checkout_failure_limit
default: 0 (disabled)
```
`Maximum number of checkout failures a client is allowed before it
gets disconnected. This is needed to prevent persistent client/server
imbalance in high availability setups where multiple PgCat instances are placed
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
in session mode
`
### default_role ### default_role
``` ```
path: pools.<pool_name>.default_role path: pools.<pool_name>.default_role
@@ -309,6 +322,45 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary, `replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified. `primary` all queries go to the primary unless otherwise specified.
### db_activity_based_routing
```
path: pools.<pool_name>.db_activity_based_routing
default: false
```
If enabled, PgCat will route queries to the primary if the queried table was recently written to.
Only relevant when `query_parser_enabled` *and* `query_parser_read_write_splitting` is enabled.
##### Considerations:
- *This feature is experimental and may not work as expected.*
- This feature only works when the same PgCat instance is used for both reads and writes to the database.
- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries.
- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions.
### db_activity_based_ms_init_delay
```
path: pools.<pool_name>.db_activity_based_ms_init_delay
default: 100
```
The delay in milliseconds before the first activity-based routing check is performed.
### db_activity_ttl
```
path: pools.<pool_name>.db_activity_ttl
default: 900
```
The time in seconds after which a DB is considered inactive when no queries/updates are performed to it.
### table_mutation_cache_ms_ttl
```
path: pools.<pool_name>.table_mutation_cache_ms_ttl
default: 50
```
The time in milliseconds after a write to a table that all queries to that table will be routed to the primary.
### prepared_statements_cache_size ### prepared_statements_cache_size
``` ```
path: general.prepared_statements_cache_size path: general.prepared_statements_cache_size

295
Cargo.lock generated
View File

@@ -132,7 +132,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -143,7 +143,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -229,6 +229,12 @@ version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "bytecount"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.4.3" version = "1.4.3"
@@ -241,6 +247,37 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "camino"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3"
dependencies = [
"serde",
]
[[package]]
name = "cargo-platform"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc"
dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.79" version = "1.0.79"
@@ -300,7 +337,7 @@ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -330,6 +367,21 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.6"
@@ -340,6 +392,19 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.4.0" version = "2.4.0"
@@ -402,6 +467,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]] [[package]]
name = "exitcode" name = "exitcode"
version = "1.1.2" version = "1.1.2"
@@ -414,6 +488,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@@ -485,7 +565,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -545,6 +625,12 @@ version = "0.27.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.6" version = "0.4.6"
@@ -918,6 +1004,21 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "mini-moka"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap",
"skeptic",
"smallvec",
"tagptr",
"triomphe",
]
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.7.1" version = "0.7.1"
@@ -992,9 +1093,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.18.0" version = "1.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]] [[package]]
name = "overload" name = "overload"
@@ -1033,7 +1134,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]] [[package]]
name = "pgcat" name = "pgcat"
version = "1.2.0" version = "1.3.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@@ -1055,6 +1156,7 @@ dependencies = [
"log", "log",
"lru", "lru",
"md-5", "md-5",
"mini-moka",
"nix", "nix",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
@@ -1069,6 +1171,7 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serial_test",
"sha-1", "sha-1",
"sha2", "sha2",
"socket2 0.4.9", "socket2 0.4.9",
@@ -1114,7 +1217,7 @@ dependencies = [
"phf_shared", "phf_shared",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1143,7 +1246,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1184,13 +1287,24 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.66" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.3.3",
"memchr",
"unicase",
]
[[package]] [[package]]
name = "quick-error" name = "quick-error"
version = "1.2.3" version = "1.2.3"
@@ -1199,9 +1313,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.31" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0" checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@@ -1380,6 +1494,24 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scc"
version = "2.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8"
dependencies = [
"sdd",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@@ -1397,23 +1529,38 @@ dependencies = [
] ]
[[package]] [[package]]
name = "serde" name = "sdd"
version = "1.0.171" version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" checksum = "49c1eeaf4b6a87c7479688c6d52b9f1153cedd3c489300564f932b065c6eab95"
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
dependencies = [
"serde",
]
[[package]]
name = "serde"
version = "1.0.214"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.171" version = "1.0.214"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1436,6 +1583,31 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serial_test"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d"
dependencies = [
"futures",
"log",
"once_cell",
"parking_lot",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"
version = "0.10.1" version = "0.10.1"
@@ -1482,6 +1654,21 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.8" version = "0.4.8"
@@ -1525,9 +1712,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlparser" name = "sqlparser"
version = "0.41.0" version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
dependencies = [ dependencies = [
"log", "log",
"sqlparser_derive", "sqlparser_derive",
@@ -1541,7 +1728,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1585,15 +1772,34 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.26" version = "2.0.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tempfile"
version = "3.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
dependencies = [
"cfg-if",
"fastrand",
"redox_syscall",
"rustix",
"windows-sys",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.43" version = "1.0.43"
@@ -1611,7 +1817,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1678,7 +1884,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1783,7 +1989,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
] ]
[[package]] [[package]]
@@ -1838,6 +2044,12 @@ dependencies = [
"tracing-serde", "tracing-serde",
] ]
[[package]]
name = "triomphe"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
[[package]] [[package]]
name = "trust-dns-proto" name = "trust-dns-proto"
version = "0.22.0" version = "0.22.0"
@@ -1895,6 +2107,12 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicase"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.13" version = "0.3.13"
@@ -1951,6 +2169,16 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.1" version = "0.3.1"
@@ -1993,7 +2221,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@@ -2015,7 +2243,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.26", "syn 2.0.87",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@@ -2067,6 +2295,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "pgcat" name = "pgcat"
version = "1.2.0" version = "1.3.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
sqlparser = { version = "0.41", features = ["visitor"] } sqlparser = { version = "0.52", features = ["visitor"] }
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
parking_lot = "0.12.1" parking_lot = "0.12.1"
@@ -55,6 +55,10 @@ tracing-subscriber = { version = "0.3.17", features = [
"std", "std",
] } ] }
lru = "0.12.0" lru = "0.12.0"
mini-moka = "0.10.3"
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0" jemallocator = "0.5.0"
[dev-dependencies]
serial_test = "*"

View File

@@ -231,7 +231,7 @@ User.find_by_email("test@example.com")
```sql ```sql
-- Grab a bunch of users from shard 1 -- Grab a bunch of users from shard 1
SET SHARD TO '1'; SET SHARD TO '1';
SELECT * FROM users LIMT 10; SELECT * FROM users LIMIT 10;
-- Find by id -- Find by id
SET SHARDING KEY TO '1234'; SET SHARDING KEY TO '1234';

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring. description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers: maintainers:
- name: Wildcard - name: PostgresML
email: support@w6d.io email: team@postgresml.org
appVersion: "1.2.0" appVersion: "1.3.0"
version: 0.2.1 version: 0.2.5

View File

@@ -51,6 +51,10 @@ stringData:
query_parser_enabled = {{ default true $pool.query_parser_enabled }} query_parser_enabled = {{ default true $pool.query_parser_enabled }}
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }} query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }} primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
db_activity_based_routing = {{ default false $pool.db_activity_based_routing }}
db_activity_based_ms_init_delay = {{ default 100 $pool.db_activity_based_ms_init_delay }}
db_activity_ttl = {{ default 900 $pool.db_activity_ttl }}
table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }}
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }} sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
{{- range $index, $user := $pool.users }} {{- range $index, $user := $pool.users }}

View File

@@ -298,6 +298,22 @@ configuration:
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting # ## @param configuration.poolsPostgres.query_parser_read_write_splitting
# query_parser_read_write_splitting: true # query_parser_read_write_splitting: true
# ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated.
# ## @param configuration.poolsPostgres.db_activity_based_routing
# db_activity_based_routing: false
# ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation.
# ## @param configuration.poolsPostgres.db_activity_based_ms_init_delay
# db_activity_based_ms_init_delay: 100
# ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries.
# ## @param configuration.poolsPostgres.db_activity_ttl
# db_activity_ttl: 900
# ## Table mutation cache TTL. How long to keep track of table mutations.
# ## @param configuration.poolsPostgres.table_mutation_cache_ttl
# table_mutation_cache_ttl: 50
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for # ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# ## load balancing of read queries. Otherwise, the primary will only be used for write # ## load balancing of read queries. Otherwise, the primary will only be used for write
# ## queries. The primary can always be explicitly selected with our custom protocol. # ## queries. The primary can always be explicitly selected with our custom protocol.

View File

@@ -1 +1,2 @@
sign: false sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random # `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors # `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime # `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0" # default_shard = "shard_0"
# So what if you wanted to implement a different hashing function, # So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it? # or you've already built one and you want this pooler to use it?

View File

@@ -859,6 +859,8 @@ where
// e.g. primary, replica, which shard. // e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new(); let mut query_router = QueryRouter::new();
let mut checkout_failure_count: u64 = 0;
self.stats.register(self.stats.clone()); self.stats.register(self.stats.clone());
// Result returned by one of the plugins. // Result returned by one of the plugins.
@@ -881,6 +883,7 @@ where
}; };
query_router.update_pool_settings(&pool.settings); query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop. // Our custom protocol loop.
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries
@@ -1107,7 +1110,25 @@ where
query_router.role(), query_router.role(),
err err
); );
checkout_failure_count += 1;
if let Some(limit) = pool.settings.checkout_failure_limit {
if checkout_failure_count >= limit {
error!(
"Checkout failure limit reached ({} / {}) - disconnecting client",
checkout_failure_count, limit
);
error_response_terminal(
&mut self.write,
&format!(
"checkout failure limit reached ({} / {})",
checkout_failure_count, limit
),
)
.await?;
self.stats.disconnect();
return Ok(());
}
}
continue; continue;
} }
}; };

View File

@@ -558,6 +558,14 @@ pub struct Pool {
/// Close idle connections that have been opened for longer than this. /// Close idle connections that have been opened for longer than this.
pub idle_timeout: Option<u64>, pub idle_timeout: Option<u64>,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
/// Close server connections that have been opened for longer than this. /// Close server connections that have been opened for longer than this.
/// Only applied to idle connections. If the connection is actively used for /// Only applied to idle connections. If the connection is actively used for
/// longer than this period, the pool will not interrupt it. /// longer than this period, the pool will not interrupt it.
@@ -589,6 +597,19 @@ pub struct Pool {
#[serde(default = "Pool::default_prepared_statements_cache_size")] #[serde(default = "Pool::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize, pub prepared_statements_cache_size: usize,
// Support for query routing based on database activity
#[serde(default = "Pool::default_db_activity_based_routing")]
pub db_activity_based_routing: bool,
#[serde(default = "Pool::default_db_activity_init_delay")]
pub db_activity_init_delay: u64,
#[serde(default = "Pool::default_db_activity_ttl")]
pub db_activity_ttl: u64,
#[serde(default = "Pool::default_table_mutation_cache_ms_ttl")]
pub table_mutation_cache_ms_ttl: u64,
pub plugins: Option<Plugins>, pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>, pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>, pub users: BTreeMap<String, User>,
@@ -642,6 +663,25 @@ impl Pool {
0 0
} }
pub fn default_db_activity_based_routing() -> bool {
false
}
pub fn default_db_activity_init_delay() -> u64 {
// 100 milliseconds
100
}
pub fn default_db_activity_ttl() -> u64 {
// 15 minutes
15 * 60
}
pub fn default_table_mutation_cache_ms_ttl() -> u64 {
// 50 milliseconds
50
}
pub fn validate(&mut self) -> Result<(), Error> { pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() { match self.default_role.as_ref() {
"any" => (), "any" => (),
@@ -724,6 +764,23 @@ impl Pool {
user.validate()?; user.validate()?;
} }
if self.db_activity_based_routing {
if self.db_activity_init_delay == 0 {
error!("db_activity_init_delay must be greater than 0");
return Err(Error::BadConfig);
}
if self.table_mutation_cache_ms_ttl == 0 {
error!("table_mutation_cache_ms_ttl must be greater than 0");
return Err(Error::BadConfig);
}
if self.db_activity_ttl == 0 {
error!("db_activity_ttl must be greater than 0");
return Err(Error::BadConfig);
}
}
Ok(()) Ok(())
} }
} }
@@ -733,6 +790,7 @@ impl Default for Pool {
Pool { Pool {
pool_mode: Self::default_pool_mode(), pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(), load_balancing_mode: Self::default_load_balancing_mode(),
checkout_failure_limit: None,
default_role: String::from("any"), default_role: String::from("any"),
query_parser_enabled: false, query_parser_enabled: false,
query_parser_max_length: None, query_parser_max_length: None,
@@ -753,6 +811,10 @@ impl Default for Pool {
cleanup_server_connections: true, cleanup_server_connections: true,
log_client_parameter_status_changes: false, log_client_parameter_status_changes: false,
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(), prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
db_activity_based_routing: Self::default_db_activity_based_routing(),
db_activity_init_delay: Self::default_db_activity_init_delay(),
db_activity_ttl: Self::default_db_activity_ttl(),
table_mutation_cache_ms_ttl: Self::default_table_mutation_cache_ms_ttl(),
plugins: None, plugins: None,
shards: BTreeMap::from([(String::from("1"), Shard::default())]), shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(), users: BTreeMap::default(),
@@ -1245,6 +1307,17 @@ impl Config {
None => self.general.idle_timeout, None => self.general.idle_timeout,
}; };
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout); info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
match pool_config.checkout_failure_limit {
Some(checkout_failure_limit) => {
info!(
"[pool: {}] Checkout failure limit: {}",
pool_name, checkout_failure_limit
);
}
None => {
info!("[pool: {}] Checkout failure limit: not set", pool_name);
}
};
info!( info!(
"[pool: {}] Sharding function: {}", "[pool: {}] Sharding function: {}",
pool_name, pool_name,
@@ -1289,6 +1362,22 @@ impl Config {
"[pool: {}] Cleanup server connections: {}", "[pool: {}] Cleanup server connections: {}",
pool_name, pool_config.cleanup_server_connections pool_name, pool_config.cleanup_server_connections
); );
info!(
"[pool: {}] DB activity based routing: {}",
pool_name, pool_config.db_activity_based_routing
);
info!(
"[pool: {}] DB activity init delay: {}",
pool_name, pool_config.db_activity_init_delay
);
info!(
"[pool: {}] DB activity TTL: {}",
pool_name, pool_config.db_activity_ttl
);
info!(
"[pool: {}] Table mutation cache TTL: {}",
pool_name, pool_config.table_mutation_cache_ms_ttl
);
info!( info!(
"[pool: {}] Log client parameter status changes: {}", "[pool: {}] Log client parameter status changes: {}",
pool_name, pool_config.log_client_parameter_status_changes pool_name, pool_config.log_client_parameter_status_changes

View File

@@ -3,7 +3,7 @@ use tracing_subscriber;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) { pub fn init(args: &Args) {
// Iniitalize a default filter, and then override the builtin default "warning" with our // Initialize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info") // commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into()); let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());

View File

@@ -152,6 +152,14 @@ pub struct PoolSettings {
/// Random or LeastOutstandingConnections. /// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode, pub load_balancing_mode: LoadBalancingMode,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
// Number of shards. // Number of shards.
pub shards: usize, pub shards: usize,
@@ -174,6 +182,18 @@ pub struct PoolSettings {
// Read from the primary as well or not. // Read from the primary as well or not.
pub primary_reads_enabled: bool, pub primary_reads_enabled: bool,
// Automatic primary/replica selection based on recent activity.
pub db_activity_based_routing: bool,
// DB activity init delay
pub db_activity_init_delay: u64,
// DB activity TTL
pub db_activity_ttl: u64,
// Table mutation cache TTL
pub table_mutation_cache_ms_ttl: u64,
// Sharding function. // Sharding function.
pub sharding_function: ShardingFunction, pub sharding_function: ShardingFunction,
@@ -215,6 +235,7 @@ impl Default for PoolSettings {
PoolSettings { PoolSettings {
pool_mode: PoolMode::Transaction, pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random, load_balancing_mode: LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 1, shards: 1,
user: User::default(), user: User::default(),
db: String::default(), db: String::default(),
@@ -223,6 +244,10 @@ impl Default for PoolSettings {
query_parser_max_length: None, query_parser_max_length: None,
query_parser_read_write_splitting: false, query_parser_read_write_splitting: false,
primary_reads_enabled: true, primary_reads_enabled: true,
db_activity_based_routing: false,
db_activity_init_delay: 100,
db_activity_ttl: 15 * 60,
table_mutation_cache_ms_ttl: 50,
sharding_function: ShardingFunction::PgBigintHash, sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None, automatic_sharding_key: None,
healthcheck_delay: General::default_healthcheck_delay(), healthcheck_delay: General::default_healthcheck_delay(),
@@ -521,6 +546,7 @@ impl ConnectionPool {
None => pool_config.pool_mode, None => pool_config.pool_mode,
}, },
load_balancing_mode: pool_config.load_balancing_mode, load_balancing_mode: pool_config.load_balancing_mode,
checkout_failure_limit: pool_config.checkout_failure_limit,
// shards: pool_config.shards.clone(), // shards: pool_config.shards.clone(),
shards: shard_ids.len(), shards: shard_ids.len(),
user: user.clone(), user: user.clone(),
@@ -537,6 +563,10 @@ impl ConnectionPool {
.query_parser_read_write_splitting, .query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled, primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function, sharding_function: pool_config.sharding_function,
db_activity_based_routing: pool_config.db_activity_based_routing,
db_activity_init_delay: pool_config.db_activity_init_delay,
db_activity_ttl: pool_config.db_activity_ttl,
table_mutation_cache_ms_ttl: pool_config.table_mutation_cache_ms_ttl,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(), automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
healthcheck_delay: config.general.healthcheck_delay, healthcheck_delay: config.general.healthcheck_delay,
healthcheck_timeout: config.general.healthcheck_timeout, healthcheck_timeout: config.general.healthcheck_timeout,

View File

@@ -2,6 +2,7 @@
/// or implied query characteristics. /// or implied query characteristics.
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use log::{debug, error}; use log::{debug, error};
use mini_moka::sync::Cache;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use regex::{Regex, RegexSet}; use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update}; use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
@@ -11,6 +12,7 @@ use sqlparser::ast::{
}; };
use sqlparser::dialect::PostgreSqlDialect; use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
use std::sync::OnceLock;
use crate::config::Role; use crate::config::Role;
use crate::errors::Error; use crate::errors::Error;
@@ -21,6 +23,7 @@ use crate::sharding::Sharder;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::io::Cursor; use std::io::Cursor;
use std::time::Duration;
use std::{cmp, mem}; use std::{cmp, mem};
/// Regexes used to parse custom commands. /// Regexes used to parse custom commands.
@@ -66,6 +69,18 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Get the value inside the custom command. // Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new(); static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
#[derive(Debug, Clone, PartialEq)]
enum DatabaseActivityState {
Active,
Initializing,
}
// A moka cache for the databases
// the key is the database name and the value is the database activity state
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
// A moka cache for the tables, the key is the db_table.
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
/// The query router. /// The query router.
pub struct QueryRouter { pub struct QueryRouter {
/// Which shard we should be talking to right now. /// Which shard we should be talking to right now.
@@ -87,6 +102,12 @@ pub struct QueryRouter {
placeholders: Vec<i16>, placeholders: Vec<i16>,
} }
struct ExtractedExprsAndTables<'a> {
exprs: Vec<Expr>,
table_names: Vec<Vec<Ident>>,
assignments_opt: Option<&'a Vec<Assignment>>,
}
impl QueryRouter { impl QueryRouter {
/// One-time initialization of regexes /// One-time initialization of regexes
/// that parse our custom SQL protocol. /// that parse our custom SQL protocol.
@@ -398,6 +419,41 @@ impl QueryRouter {
} }
} }
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
DATABASE_ACTIVITY_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
.build()
})
.clone()
}
/// Check database activity state and reset it if necessary
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
let cache = self.database_activity_cache();
// Exists in cache
if cache.contains_key(db) {
return cache.get(db).unwrap();
}
// Not in cache
debug!("Adding database to cache: {}", db);
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
// Set a timer to update the cache
let db = db.clone();
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
cache.insert(db, DatabaseActivityState::Active);
});
DatabaseActivityState::Initializing
}
/// Try to infer which server to connect to based on the contents of the query. /// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> { pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting { if !self.pool_settings.query_parser_read_write_splitting {
@@ -412,9 +468,23 @@ impl QueryRouter {
return Err(Error::QueryRouterParserError("empty query".into())); return Err(Error::QueryRouterParserError("empty query".into()));
} }
let mut primary_set_based_on_activity = false;
let mut visited_write_statement = false; let mut visited_write_statement = false;
let mut prev_inferred_shard = None; let mut prev_inferred_shard = None;
if self.pool_settings.db_activity_based_routing {
let db = self.pool_settings.db.clone();
let state = self.database_activity_state(&db);
debug!("Database activity state: {:?}", state);
if let DatabaseActivityState::Initializing = state {
debug!("Database is initializing, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
}
}
for q in ast { for q in ast {
match q { match q {
// All transactions go to the primary, probably a write. // All transactions go to the primary, probably a write.
@@ -425,6 +495,22 @@ impl QueryRouter {
// Likely a read-only query // Likely a read-only query
Query(query) => { Query(query) => {
if primary_set_based_on_activity {
// If we already set the role based on activity, we don't need to do it again
continue;
}
if self.pool_settings.db_activity_based_routing {
// Check if the tables in the query have been written to recently
if self.query_handles_tables_in_mutation_cache(query) {
debug!("Query handles tables in mutation cache, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
continue;
}
}
match &self.pool_settings.automatic_sharding_key { match &self.pool_settings.automatic_sharding_key {
Some(_) => { Some(_) => {
// TODO: if we have multiple queries in the same message, // TODO: if we have multiple queries in the same message,
@@ -455,6 +541,13 @@ impl QueryRouter {
// Likely a write // Likely a write
_ => { _ => {
debug!("Write statement found, going to primary");
if self.pool_settings.db_activity_based_routing {
// add all of the query tables to the mutation cache
self.update_mutation_cache_on_write(q);
}
match &self.pool_settings.automatic_sharding_key { match &self.pool_settings.automatic_sharding_key {
Some(_) => { Some(_) => {
// TODO: similar to the above, if we have multiple queries in the // TODO: similar to the above, if we have multiple queries in the
@@ -497,62 +590,69 @@ impl QueryRouter {
Ok(()) Ok(())
} }
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> { fn table_mutations_cache(&self) -> Cache<String, bool> {
let mut exprs = Vec::new(); TABLE_MUTATIONS_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_live(Duration::from_millis(
self.pool_settings.table_mutation_cache_ms_ttl,
))
.build()
})
.clone()
}
// Collect all table names from the query. fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
let mut table_names = Vec::new(); let table_mutations_cache = self.table_mutations_cache();
debug!("Checking if query handles tables in mutation cache");
debug!("Table mutations cache: {:?}", table_mutations_cache);
match q { for tables in self.table_names(query) {
Insert { for table in tables {
or, if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
into: _, return true;
table_name,
columns,
overwrite: _,
source,
partitioned,
after_columns,
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(or.is_none());
assert!(partitioned.is_none());
assert!(after_columns.is_empty());
Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
} }
} }
Delete { }
tables,
from, false
using, }
selection, fn extract_exprs_and_table_names<'a>(
returning: _, &'a self,
order_by: _, q: &'a Statement,
limit: _, ) -> Option<ExtractedExprsAndTables<'a>> {
} => { let mut exprs = Vec::new();
if let Some(expr) = selection { let mut table_names = Vec::new();
let mut assignments_opt = None;
match q {
Insert(i) => {
// Not supported in postgres.
assert!(i.or.is_none());
assert!(i.partitioned.is_none());
assert!(i.after_columns.is_empty());
Self::process_table(&i.table_name, &mut table_names);
if let Some(source) = &i.source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
}
}
Delete(d) => {
if let Some(expr) = &d.selection {
exprs.push(expr.clone()); exprs.push(expr.clone());
} }
// Multi tables delete are not supported in postgres. // Multi-tables delete are not supported in postgres.
assert!(tables.is_empty()); assert!(d.tables.is_empty());
Self::process_tables_with_join(from, &mut exprs, &mut table_names); if let Some(using_tbl_with_join) = &d.using {
if let Some(using_tbl_with_join) = using {
Self::process_tables_with_join( Self::process_tables_with_join(
using_tbl_with_join, using_tbl_with_join,
&mut exprs, &mut exprs,
&mut table_names, &mut table_names,
); );
} }
Self::process_selection(selection, &mut exprs); Self::process_selection(&d.selection, &mut exprs);
} }
Update { Update {
table, table,
@@ -566,14 +666,55 @@ impl QueryRouter {
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names); Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
} }
Self::process_selection(selection, &mut exprs); Self::process_selection(selection, &mut exprs);
self.assignment_parser(assignments)?;
} assignments_opt = Some(assignments);
_ => {
return Ok(None);
} }
_ => return None,
}; };
Ok(self.infer_shard_from_exprs(exprs, table_names)) Some(ExtractedExprsAndTables {
exprs,
table_names,
assignments_opt,
})
}
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
let exprs = extracted.exprs;
let table_names = extracted.table_names;
let assignments_opt = extracted.assignments_opt;
if let Some(assignments) = assignments_opt {
self.assignment_parser(assignments)?;
}
Ok(self.infer_shard_from_exprs(exprs, table_names))
} else {
Ok(None)
}
}
fn update_mutation_cache_on_write(&self, q: &Statement) {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
debug!("Updating mutation cache on write");
let table_names = extracted.table_names;
debug!("Table names in mutation query: {:?}", table_names);
let table_mutations_cache = self.table_mutations_cache();
for tables in table_names {
for table in tables {
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
}
}
}
}
// combines the database name and table name into a single string
// to be used as the key in the table mutation cache
// e.g. "mydb.mytable"
fn table_mutation_cache_key(&self, table: Ident) -> String {
format!("{}.{}", self.pool_settings.db, table.value)
} }
fn process_query( fn process_query(
@@ -822,7 +963,13 @@ impl QueryRouter {
for a in assignments { for a in assignments {
if sharding_key[0].value == "*" if sharding_key[0].value == "*"
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase() && sharding_key[1].value
== a.target
.to_string()
.split('.')
.last()
.unwrap()
.to_lowercase()
{ {
return Err(Error::QueryRouterParserError( return Err(Error::QueryRouterParserError(
"Sharding key cannot be updated.".into(), "Sharding key cannot be updated.".into(),
@@ -955,6 +1102,18 @@ impl QueryRouter {
self.infer_shard_from_exprs(exprs, table_names) self.infer_shard_from_exprs(exprs, table_names)
} }
/// get table names from query
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
let mut exprs = Vec::new();
let mut table_names = Vec::new();
Self::process_query(query, &mut exprs, &mut table_names, &None);
debug!("Table names in query: {:?}", table_names);
table_names
}
fn infer_shard_from_exprs( fn infer_shard_from_exprs(
&mut self, &mut self,
exprs: Vec<Expr>, exprs: Vec<Expr>,
@@ -1061,6 +1220,11 @@ impl QueryRouter {
self.active_shard self.active_shard
} }
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to. /// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> { pub fn role(&self) -> Option<Role> {
self.active_role self.active_role
@@ -1117,6 +1281,7 @@ mod test {
use crate::messages::simple_query; use crate::messages::simple_query;
use crate::sharding::ShardingFunction; use crate::sharding::ShardingFunction;
use bytes::BufMut; use bytes::BufMut;
use serial_test::serial;
#[test] #[test]
fn test_defaults() { fn test_defaults() {
@@ -1452,6 +1617,7 @@ mod test {
let pool_settings = PoolSettings { let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction, pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random, load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 2, shards: 2,
user: crate::config::User::default(), user: crate::config::User::default(),
default_role: Some(Role::Replica), default_role: Some(Role::Replica),
@@ -1472,6 +1638,10 @@ mod test {
auth_query_password: None, auth_query_password: None,
auth_query_user: None, auth_query_user: None,
db: "test".to_string(), db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None, plugins: None,
}; };
let mut qr = QueryRouter::new(); let mut qr = QueryRouter::new();
@@ -1530,6 +1700,7 @@ mod test {
let pool_settings = PoolSettings { let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction, pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random, load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: Some(10),
shards: 5, shards: 5,
user: crate::config::User::default(), user: crate::config::User::default(),
default_role: Some(Role::Replica), default_role: Some(Role::Replica),
@@ -1550,6 +1721,10 @@ mod test {
auth_query_password: None, auth_query_password: None,
auth_query_user: None, auth_query_user: None,
db: "test".to_string(), db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None, plugins: None,
}; };
@@ -1965,4 +2140,150 @@ mod test {
assert_eq!(res, Ok(PluginOutput::Allow)); assert_eq!(res, Ok(PluginOutput::Allow));
} }
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_initializing_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Initially, the database activity should be in the "Initializing" state
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
assert_eq!(state, DatabaseActivityState::Initializing);
// Check that the router chooses the primary role due to "Initializing" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_active_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
let db_name = qr.pool_settings.db.clone();
let cache = qr.database_activity_cache();
cache.insert(db_name.clone(), DatabaseActivityState::Active);
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Check that the router can choose a replica role when in "Active" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), None); // Default should allow replica due to active state
}
#[tokio::test]
#[serial]
async fn test_table_mutation_cache_on_write() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
let ast = qr.parse(&query).unwrap();
// Simulate the mutation query which should populate the mutation cache
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
let cache = qr.table_mutations_cache();
// Ensure the table mutation cache contains the table with recent write
assert!(cache.contains_key(&table_cache_key));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_multi_query() {
use super::*;
use crate::messages::simple_query;
use tokio::time::Duration;
QueryRouter::setup();
let mut qr = QueryRouter::new();
// Configure the pool settings for db_activity_based_routing
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.db = "test_db_activity_routing".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
// First query when database is initializing
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
// Should route to primary because database is initializing
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the initialization delay to pass
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.db_activity_init_delay * 2,
))
.await;
// Next query after database is active
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because database is active and no recent mutations
assert_eq!(qr.role(), None);
// Simulate a write query to update the mutation cache
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because it's a write operation
assert_eq!(qr.role(), Some(Role::Primary));
// Immediately run a read query on the same table
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because the table was recently mutated
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the mutation cache TTL to expire
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
))
.await;
// Run the read query again after cache expiration
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because mutation cache has expired
assert_eq!(qr.role(), None);
}
} }

View File

@@ -56,6 +56,41 @@ describe "Random Load Balancing" do
end end
end end
end end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end end
describe "Least Outstanding Queries Load Balancing" do describe "Least Outstanding Queries Load Balancing" do
@@ -161,4 +196,3 @@ describe "Least Outstanding Queries Load Balancing" do
end end
end end
end end

View File

@@ -188,6 +188,102 @@ describe "Miscellaneous" do
end end
end end
describe "Checkout failure limit" do
context "when no checkout failure limit is set" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set high" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set low" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "disconnects client after reaching limit" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
checkout_failure_count = 0
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(1);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
checkout_failure_count += 1
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::ConnectionBad
expect(checkout_failure_count).to eq(2)
expect(conn.status).to eq(PG::CONNECTION_BAD)
break
end
end
conn.close
end
end.each(&:join)
puts processes.pgcat.logs
end
end
end
describe "Server version reporting" do describe "Server version reporting" do
it "reports correct version for normal and admin databases" do it "reports correct version for normal and admin databases" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))