mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
1 Commits
mostafa_ch
...
circleci_O
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16a2cece21 |
53
CONFIG.md
53
CONFIG.md
@@ -298,19 +298,6 @@ Load balancing mode
|
||||
`random` selects the server at random
|
||||
`loc` selects the server with the least outstanding busy connections
|
||||
|
||||
### checkout_failure_limit
|
||||
```
|
||||
path: pools.<pool_name>.checkout_failure_limit
|
||||
default: 0 (disabled)
|
||||
```
|
||||
|
||||
`Maximum number of checkout failures a client is allowed before it
|
||||
gets disconnected. This is needed to prevent persistent client/server
|
||||
imbalance in high availability setups where multiple PgCat instances are placed
|
||||
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
in session mode
|
||||
`
|
||||
### default_role
|
||||
```
|
||||
path: pools.<pool_name>.default_role
|
||||
@@ -322,44 +309,14 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
|
||||
`replica` round-robin between replicas only without touching the primary,
|
||||
`primary` all queries go to the primary unless otherwise specified.
|
||||
|
||||
### db_activity_based_routing
|
||||
### replica_to_primary_failover_enabled
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_based_routing
|
||||
default: false
|
||||
path: pools.<pool_name>.replica_to_primary_failover_enabled
|
||||
default: "false"
|
||||
```
|
||||
|
||||
If enabled, PgCat will route queries to the primary if the queried table was recently written to.
|
||||
Only relevant when `query_parser_enabled` *and* `query_parser_read_write_splitting` is enabled.
|
||||
|
||||
##### Considerations:
|
||||
- *This feature is experimental and may not work as expected.*
|
||||
- This feature only works when the same PgCat instance is used for both reads and writes to the database.
|
||||
- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries.
|
||||
- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions.
|
||||
|
||||
### db_activity_based_ms_init_delay
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_based_ms_init_delay
|
||||
default: 100
|
||||
```
|
||||
|
||||
The delay in milliseconds before the first activity-based routing check is performed.
|
||||
|
||||
### db_activity_ttl
|
||||
```
|
||||
path: pools.<pool_name>.db_activity_ttl
|
||||
default: 900
|
||||
```
|
||||
|
||||
The time in seconds after which a DB is considered inactive when no queries/updates are performed to it.
|
||||
|
||||
### table_mutation_cache_ms_ttl
|
||||
```
|
||||
path: pools.<pool_name>.table_mutation_cache_ms_ttl
|
||||
default: 50
|
||||
```
|
||||
|
||||
The time in milliseconds after a write to a table that all queries to that table will be routed to the primary.
|
||||
If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
|
||||
and all replicas are banned, queries will be sent to the primary (until a replica is back online).
|
||||
|
||||
### prepared_statements_cache_size
|
||||
```
|
||||
|
||||
293
Cargo.lock
generated
293
Cargo.lock
generated
@@ -132,7 +132,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -143,7 +143,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -229,12 +229,6 @@ version = "3.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
|
||||
|
||||
[[package]]
|
||||
name = "bytecount"
|
||||
version = "0.6.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.4.3"
|
||||
@@ -247,37 +241,6 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
|
||||
|
||||
[[package]]
|
||||
name = "camino"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo-platform"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo_metadata"
|
||||
version = "0.14.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
|
||||
dependencies = [
|
||||
"camino",
|
||||
"cargo-platform",
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.79"
|
||||
@@ -337,7 +300,7 @@ dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -367,21 +330,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.1.6"
|
||||
@@ -392,19 +340,6 @@ dependencies = [
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.4.0"
|
||||
@@ -467,15 +402,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "error-chain"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "exitcode"
|
||||
version = "1.1.2"
|
||||
@@ -488,12 +414,6 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -565,7 +485,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -625,12 +545,6 @@ version = "0.27.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.4.6"
|
||||
@@ -1004,21 +918,6 @@ dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mini-moka"
|
||||
version = "0.10.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"skeptic",
|
||||
"smallvec",
|
||||
"tagptr",
|
||||
"triomphe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.7.1"
|
||||
@@ -1093,9 +992,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.20.2"
|
||||
version = "1.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
|
||||
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
@@ -1134,7 +1033,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "1.3.0"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
@@ -1156,7 +1055,6 @@ dependencies = [
|
||||
"log",
|
||||
"lru",
|
||||
"md-5",
|
||||
"mini-moka",
|
||||
"nix",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
@@ -1171,7 +1069,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"sha-1",
|
||||
"sha2",
|
||||
"socket2 0.4.9",
|
||||
@@ -1217,7 +1114,7 @@ dependencies = [
|
||||
"phf_shared",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1246,7 +1143,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1287,24 +1184,13 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.89"
|
||||
version = "1.0.66"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
|
||||
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pulldown-cmark"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
|
||||
dependencies = [
|
||||
"bitflags 2.3.3",
|
||||
"memchr",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-error"
|
||||
version = "1.2.3"
|
||||
@@ -1313,9 +1199,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.37"
|
||||
version = "1.0.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
|
||||
checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
@@ -1494,24 +1380,6 @@ version = "1.0.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||
dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scc"
|
||||
version = "2.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8"
|
||||
dependencies = [
|
||||
"sdd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
@@ -1528,39 +1396,24 @@ dependencies = [
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sdd"
|
||||
version = "3.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49c1eeaf4b6a87c7479688c6d52b9f1153cedd3c489300564f932b065c6eab95"
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.214"
|
||||
version = "1.0.171"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5"
|
||||
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.214"
|
||||
version = "1.0.171"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766"
|
||||
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1583,31 +1436,6 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "3.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"scc",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "3.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha-1"
|
||||
version = "0.10.1"
|
||||
@@ -1654,21 +1482,6 @@ version = "0.3.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
|
||||
|
||||
[[package]]
|
||||
name = "skeptic"
|
||||
version = "0.13.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
|
||||
dependencies = [
|
||||
"bytecount",
|
||||
"cargo_metadata",
|
||||
"error-chain",
|
||||
"glob",
|
||||
"pulldown-cmark",
|
||||
"tempfile",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.8"
|
||||
@@ -1712,9 +1525,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.52.0"
|
||||
version = "0.41.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
|
||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||
dependencies = [
|
||||
"log",
|
||||
"sqlparser_derive",
|
||||
@@ -1728,7 +1541,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1772,34 +1585,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.87"
|
||||
version = "2.0.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
|
||||
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tagptr"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand",
|
||||
"redox_syscall",
|
||||
"rustix",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.43"
|
||||
@@ -1817,7 +1611,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1884,7 +1678,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1989,7 +1783,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2044,12 +1838,6 @@ dependencies = [
|
||||
"tracing-serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "triomphe"
|
||||
version = "0.1.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
|
||||
|
||||
[[package]]
|
||||
name = "trust-dns-proto"
|
||||
version = "0.22.0"
|
||||
@@ -2107,12 +1895,6 @@ version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
version = "0.3.13"
|
||||
@@ -2169,16 +1951,6 @@ version = "0.9.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||
|
||||
[[package]]
|
||||
name = "walkdir"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
|
||||
dependencies = [
|
||||
"same-file",
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.1"
|
||||
@@ -2221,7 +1993,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
@@ -2243,7 +2015,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"syn 2.0.26",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
@@ -2295,15 +2067,6 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "1.3.0"
|
||||
version = "1.2.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = { version = "0.52", features = ["visitor"] }
|
||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
parking_lot = "0.12.1"
|
||||
@@ -55,10 +55,6 @@ tracing-subscriber = { version = "0.3.17", features = [
|
||||
"std",
|
||||
] }
|
||||
lru = "0.12.0"
|
||||
mini-moka = "0.10.3"
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
jemallocator = "0.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
serial_test = "*"
|
||||
|
||||
@@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
|
||||
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
||||
|
||||
### Failover
|
||||
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||
|
||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||
|
||||
@@ -231,7 +231,7 @@ User.find_by_email("test@example.com")
|
||||
```sql
|
||||
-- Grab a bunch of users from shard 1
|
||||
SET SHARD TO '1';
|
||||
SELECT * FROM users LIMIT 10;
|
||||
SELECT * FROM users LIMT 10;
|
||||
|
||||
-- Find by id
|
||||
SET SHARDING KEY TO '1234';
|
||||
|
||||
@@ -2,7 +2,7 @@ apiVersion: v2
|
||||
name: pgcat
|
||||
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
maintainers:
|
||||
- name: PostgresML
|
||||
email: team@postgresml.org
|
||||
appVersion: "1.3.0"
|
||||
version: 0.2.5
|
||||
- name: Wildcard
|
||||
email: support@w6d.io
|
||||
appVersion: "1.2.0"
|
||||
version: 0.2.4
|
||||
|
||||
@@ -51,10 +51,6 @@ stringData:
|
||||
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
|
||||
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
|
||||
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
|
||||
db_activity_based_routing = {{ default false $pool.db_activity_based_routing }}
|
||||
db_activity_based_ms_init_delay = {{ default 100 $pool.db_activity_based_ms_init_delay }}
|
||||
db_activity_ttl = {{ default 900 $pool.db_activity_ttl }}
|
||||
table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }}
|
||||
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
|
||||
|
||||
{{- range $index, $user := $pool.users }}
|
||||
|
||||
@@ -298,22 +298,6 @@ configuration:
|
||||
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
|
||||
# query_parser_read_write_splitting: true
|
||||
|
||||
# ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated.
|
||||
# ## @param configuration.poolsPostgres.db_activity_based_routing
|
||||
# db_activity_based_routing: false
|
||||
|
||||
# ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation.
|
||||
# ## @param configuration.poolsPostgres.db_activity_based_ms_init_delay
|
||||
# db_activity_based_ms_init_delay: 100
|
||||
|
||||
# ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries.
|
||||
# ## @param configuration.poolsPostgres.db_activity_ttl
|
||||
# db_activity_ttl: 900
|
||||
|
||||
# ## Table mutation cache TTL. How long to keep track of table mutations.
|
||||
# ## @param configuration.poolsPostgres.table_mutation_cache_ttl
|
||||
# table_mutation_cache_ttl: 50
|
||||
|
||||
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# ## load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# ## queries. The primary can always be explicitly selected with our custom protocol.
|
||||
|
||||
@@ -859,8 +859,6 @@ where
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
|
||||
let mut checkout_failure_count: u64 = 0;
|
||||
|
||||
self.stats.register(self.stats.clone());
|
||||
|
||||
// Result returned by one of the plugins.
|
||||
@@ -1110,25 +1108,7 @@ where
|
||||
query_router.role(),
|
||||
err
|
||||
);
|
||||
checkout_failure_count += 1;
|
||||
if let Some(limit) = pool.settings.checkout_failure_limit {
|
||||
if checkout_failure_count >= limit {
|
||||
error!(
|
||||
"Checkout failure limit reached ({} / {}) - disconnecting client",
|
||||
checkout_failure_count, limit
|
||||
);
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
&format!(
|
||||
"checkout failure limit reached ({} / {})",
|
||||
checkout_failure_count, limit
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
self.stats.disconnect();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -541,6 +541,9 @@ pub struct Pool {
|
||||
#[serde(default = "Pool::default_default_role")]
|
||||
pub default_role: String,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub replica_to_primary_failover_enabled: bool,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub query_parser_enabled: bool,
|
||||
|
||||
@@ -558,14 +561,6 @@ pub struct Pool {
|
||||
/// Close idle connections that have been opened for longer than this.
|
||||
pub idle_timeout: Option<u64>,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
/// Close server connections that have been opened for longer than this.
|
||||
/// Only applied to idle connections. If the connection is actively used for
|
||||
/// longer than this period, the pool will not interrupt it.
|
||||
@@ -597,19 +592,6 @@ pub struct Pool {
|
||||
#[serde(default = "Pool::default_prepared_statements_cache_size")]
|
||||
pub prepared_statements_cache_size: usize,
|
||||
|
||||
// Support for query routing based on database activity
|
||||
#[serde(default = "Pool::default_db_activity_based_routing")]
|
||||
pub db_activity_based_routing: bool,
|
||||
|
||||
#[serde(default = "Pool::default_db_activity_init_delay")]
|
||||
pub db_activity_init_delay: u64,
|
||||
|
||||
#[serde(default = "Pool::default_db_activity_ttl")]
|
||||
pub db_activity_ttl: u64,
|
||||
|
||||
#[serde(default = "Pool::default_table_mutation_cache_ms_ttl")]
|
||||
pub table_mutation_cache_ms_ttl: u64,
|
||||
|
||||
pub plugins: Option<Plugins>,
|
||||
pub shards: BTreeMap<String, Shard>,
|
||||
pub users: BTreeMap<String, User>,
|
||||
@@ -663,25 +645,6 @@ impl Pool {
|
||||
0
|
||||
}
|
||||
|
||||
pub fn default_db_activity_based_routing() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn default_db_activity_init_delay() -> u64 {
|
||||
// 100 milliseconds
|
||||
100
|
||||
}
|
||||
|
||||
pub fn default_db_activity_ttl() -> u64 {
|
||||
// 15 minutes
|
||||
15 * 60
|
||||
}
|
||||
|
||||
pub fn default_table_mutation_cache_ms_ttl() -> u64 {
|
||||
// 50 milliseconds
|
||||
50
|
||||
}
|
||||
|
||||
pub fn validate(&mut self) -> Result<(), Error> {
|
||||
match self.default_role.as_ref() {
|
||||
"any" => (),
|
||||
@@ -764,23 +727,6 @@ impl Pool {
|
||||
user.validate()?;
|
||||
}
|
||||
|
||||
if self.db_activity_based_routing {
|
||||
if self.db_activity_init_delay == 0 {
|
||||
error!("db_activity_init_delay must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if self.table_mutation_cache_ms_ttl == 0 {
|
||||
error!("table_mutation_cache_ms_ttl must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if self.db_activity_ttl == 0 {
|
||||
error!("db_activity_ttl must be greater than 0");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -790,8 +736,8 @@ impl Default for Pool {
|
||||
Pool {
|
||||
pool_mode: Self::default_pool_mode(),
|
||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||
checkout_failure_limit: None,
|
||||
default_role: String::from("any"),
|
||||
replica_to_primary_failover_enabled: false,
|
||||
query_parser_enabled: false,
|
||||
query_parser_max_length: None,
|
||||
query_parser_read_write_splitting: false,
|
||||
@@ -811,10 +757,6 @@ impl Default for Pool {
|
||||
cleanup_server_connections: true,
|
||||
log_client_parameter_status_changes: false,
|
||||
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
|
||||
db_activity_based_routing: Self::default_db_activity_based_routing(),
|
||||
db_activity_init_delay: Self::default_db_activity_init_delay(),
|
||||
db_activity_ttl: Self::default_db_activity_ttl(),
|
||||
table_mutation_cache_ms_ttl: Self::default_table_mutation_cache_ms_ttl(),
|
||||
plugins: None,
|
||||
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
|
||||
users: BTreeMap::default(),
|
||||
@@ -1307,17 +1249,6 @@ impl Config {
|
||||
None => self.general.idle_timeout,
|
||||
};
|
||||
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
|
||||
match pool_config.checkout_failure_limit {
|
||||
Some(checkout_failure_limit) => {
|
||||
info!(
|
||||
"[pool: {}] Checkout failure limit: {}",
|
||||
pool_name, checkout_failure_limit
|
||||
);
|
||||
}
|
||||
None => {
|
||||
info!("[pool: {}] Checkout failure limit: not set", pool_name);
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"[pool: {}] Sharding function: {}",
|
||||
pool_name,
|
||||
@@ -1362,22 +1293,6 @@ impl Config {
|
||||
"[pool: {}] Cleanup server connections: {}",
|
||||
pool_name, pool_config.cleanup_server_connections
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity based routing: {}",
|
||||
pool_name, pool_config.db_activity_based_routing
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity init delay: {}",
|
||||
pool_name, pool_config.db_activity_init_delay
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] DB activity TTL: {}",
|
||||
pool_name, pool_config.db_activity_ttl
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Table mutation cache TTL: {}",
|
||||
pool_name, pool_config.table_mutation_cache_ms_ttl
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Log client parameter status changes: {}",
|
||||
pool_name, pool_config.log_client_parameter_status_changes
|
||||
|
||||
@@ -3,7 +3,7 @@ use tracing_subscriber;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
pub fn init(args: &Args) {
|
||||
// Initialize a default filter, and then override the builtin default "warning" with our
|
||||
// Iniitalize a default filter, and then override the builtin default "warning" with our
|
||||
// commandline, (default: "info")
|
||||
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
|
||||
|
||||
|
||||
81
src/pool.rs
81
src/pool.rs
@@ -152,14 +152,6 @@ pub struct PoolSettings {
|
||||
/// Random or LeastOutstandingConnections.
|
||||
pub load_balancing_mode: LoadBalancingMode,
|
||||
|
||||
/// Maximum number of checkout failures a client is allowed before it
|
||||
/// gets disconnected. This is needed to prevent persistent client/server
|
||||
/// imbalance in high availability setups where multiple PgCat instances are placed
|
||||
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
|
||||
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
|
||||
/// in session mode
|
||||
pub checkout_failure_limit: Option<u64>,
|
||||
|
||||
// Number of shards.
|
||||
pub shards: usize,
|
||||
|
||||
@@ -170,6 +162,9 @@ pub struct PoolSettings {
|
||||
// Default server role to connect to.
|
||||
pub default_role: Option<Role>,
|
||||
|
||||
// Whether or not we should use primary when replicas are unavailable
|
||||
pub replica_to_primary_failover_enabled: bool,
|
||||
|
||||
// Enable/disable query parser.
|
||||
pub query_parser_enabled: bool,
|
||||
|
||||
@@ -182,18 +177,6 @@ pub struct PoolSettings {
|
||||
// Read from the primary as well or not.
|
||||
pub primary_reads_enabled: bool,
|
||||
|
||||
// Automatic primary/replica selection based on recent activity.
|
||||
pub db_activity_based_routing: bool,
|
||||
|
||||
// DB activity init delay
|
||||
pub db_activity_init_delay: u64,
|
||||
|
||||
// DB activity TTL
|
||||
pub db_activity_ttl: u64,
|
||||
|
||||
// Table mutation cache TTL
|
||||
pub table_mutation_cache_ms_ttl: u64,
|
||||
|
||||
// Sharding function.
|
||||
pub sharding_function: ShardingFunction,
|
||||
|
||||
@@ -235,19 +218,15 @@ impl Default for PoolSettings {
|
||||
PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 1,
|
||||
user: User::default(),
|
||||
db: String::default(),
|
||||
default_role: None,
|
||||
replica_to_primary_failover_enabled: false,
|
||||
query_parser_enabled: false,
|
||||
query_parser_max_length: None,
|
||||
query_parser_read_write_splitting: false,
|
||||
primary_reads_enabled: true,
|
||||
db_activity_based_routing: false,
|
||||
db_activity_init_delay: 100,
|
||||
db_activity_ttl: 15 * 60,
|
||||
table_mutation_cache_ms_ttl: 50,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
automatic_sharding_key: None,
|
||||
healthcheck_delay: General::default_healthcheck_delay(),
|
||||
@@ -546,7 +525,6 @@ impl ConnectionPool {
|
||||
None => pool_config.pool_mode,
|
||||
},
|
||||
load_balancing_mode: pool_config.load_balancing_mode,
|
||||
checkout_failure_limit: pool_config.checkout_failure_limit,
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
user: user.clone(),
|
||||
@@ -557,16 +535,14 @@ impl ConnectionPool {
|
||||
"primary" => Some(Role::Primary),
|
||||
_ => unreachable!(),
|
||||
},
|
||||
replica_to_primary_failover_enabled: pool_config
|
||||
.replica_to_primary_failover_enabled,
|
||||
query_parser_enabled: pool_config.query_parser_enabled,
|
||||
query_parser_max_length: pool_config.query_parser_max_length,
|
||||
query_parser_read_write_splitting: pool_config
|
||||
.query_parser_read_write_splitting,
|
||||
primary_reads_enabled: pool_config.primary_reads_enabled,
|
||||
sharding_function: pool_config.sharding_function,
|
||||
db_activity_based_routing: pool_config.db_activity_based_routing,
|
||||
db_activity_init_delay: pool_config.db_activity_init_delay,
|
||||
db_activity_ttl: pool_config.db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: pool_config.table_mutation_cache_ms_ttl,
|
||||
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
|
||||
healthcheck_delay: config.general.healthcheck_delay,
|
||||
healthcheck_timeout: config.general.healthcheck_timeout,
|
||||
@@ -761,6 +737,19 @@ impl ConnectionPool {
|
||||
});
|
||||
}
|
||||
|
||||
// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
|
||||
// we add primary address at the end of the list of candidates, this way it will be tried when
|
||||
// replicas are all unavailable.
|
||||
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
|
||||
let mut primaries = self
|
||||
.addresses
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter(|address| address.role == Role::Primary)
|
||||
.collect::<Vec<&Address>>();
|
||||
candidates.insert(0, primaries.pop().unwrap());
|
||||
}
|
||||
|
||||
// Indicate we're waiting on a server connection from a pool.
|
||||
let now = Instant::now();
|
||||
client_stats.waiting();
|
||||
@@ -965,24 +954,28 @@ impl ConnectionPool {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if all replicas are banned, in that case unban all of them
|
||||
let replicas_available = self.addresses[address.shard]
|
||||
.iter()
|
||||
.filter(|addr| addr.role == Role::Replica)
|
||||
.count();
|
||||
// If we have replica to primary failover we should not unban replicas
|
||||
// as we still have the primary to server traffic.
|
||||
if !self.settings.replica_to_primary_failover_enabled {
|
||||
// Check if all replicas are banned, in that case unban all of them
|
||||
let replicas_available = self.addresses[address.shard]
|
||||
.iter()
|
||||
.filter(|addr| addr.role == Role::Replica)
|
||||
.count();
|
||||
|
||||
debug!("Available targets: {}", replicas_available);
|
||||
debug!("Available targets: {}", replicas_available);
|
||||
|
||||
let read_guard = self.banlist.read();
|
||||
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
||||
drop(read_guard);
|
||||
let read_guard = self.banlist.read();
|
||||
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
||||
drop(read_guard);
|
||||
|
||||
if all_replicas_banned {
|
||||
let mut write_guard = self.banlist.write();
|
||||
warn!("Unbanning all replicas.");
|
||||
write_guard[address.shard].clear();
|
||||
if all_replicas_banned {
|
||||
let mut write_guard = self.banlist.write();
|
||||
warn!("Unbanning all replicas.");
|
||||
write_guard[address.shard].clear();
|
||||
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if ban time is expired
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
/// or implied query characteristics.
|
||||
use bytes::{Buf, BytesMut};
|
||||
use log::{debug, error};
|
||||
use mini_moka::sync::Cache;
|
||||
use once_cell::sync::OnceCell;
|
||||
use regex::{Regex, RegexSet};
|
||||
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
|
||||
@@ -12,7 +11,6 @@ use sqlparser::ast::{
|
||||
};
|
||||
use sqlparser::dialect::PostgreSqlDialect;
|
||||
use sqlparser::parser::Parser;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use crate::config::Role;
|
||||
use crate::errors::Error;
|
||||
@@ -23,7 +21,6 @@ use crate::sharding::Sharder;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::Cursor;
|
||||
use std::time::Duration;
|
||||
use std::{cmp, mem};
|
||||
|
||||
/// Regexes used to parse custom commands.
|
||||
@@ -69,18 +66,6 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
|
||||
// Get the value inside the custom command.
|
||||
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
enum DatabaseActivityState {
|
||||
Active,
|
||||
Initializing,
|
||||
}
|
||||
|
||||
// A moka cache for the databases
|
||||
// the key is the database name and the value is the database activity state
|
||||
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
|
||||
// A moka cache for the tables, the key is the db_table.
|
||||
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
|
||||
|
||||
/// The query router.
|
||||
pub struct QueryRouter {
|
||||
/// Which shard we should be talking to right now.
|
||||
@@ -102,12 +87,6 @@ pub struct QueryRouter {
|
||||
placeholders: Vec<i16>,
|
||||
}
|
||||
|
||||
struct ExtractedExprsAndTables<'a> {
|
||||
exprs: Vec<Expr>,
|
||||
table_names: Vec<Vec<Ident>>,
|
||||
assignments_opt: Option<&'a Vec<Assignment>>,
|
||||
}
|
||||
|
||||
impl QueryRouter {
|
||||
/// One-time initialization of regexes
|
||||
/// that parse our custom SQL protocol.
|
||||
@@ -419,41 +398,6 @@ impl QueryRouter {
|
||||
}
|
||||
}
|
||||
|
||||
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
|
||||
DATABASE_ACTIVITY_CACHE
|
||||
.get_or_init(|| {
|
||||
Cache::builder()
|
||||
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
|
||||
.build()
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Check database activity state and reset it if necessary
|
||||
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
|
||||
let cache = self.database_activity_cache();
|
||||
|
||||
// Exists in cache
|
||||
if cache.contains_key(db) {
|
||||
return cache.get(db).unwrap();
|
||||
}
|
||||
|
||||
// Not in cache
|
||||
debug!("Adding database to cache: {}", db);
|
||||
|
||||
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
|
||||
|
||||
// Set a timer to update the cache
|
||||
let db = db.clone();
|
||||
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
|
||||
cache.insert(db, DatabaseActivityState::Active);
|
||||
});
|
||||
|
||||
DatabaseActivityState::Initializing
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||
if !self.pool_settings.query_parser_read_write_splitting {
|
||||
@@ -468,23 +412,9 @@ impl QueryRouter {
|
||||
return Err(Error::QueryRouterParserError("empty query".into()));
|
||||
}
|
||||
|
||||
let mut primary_set_based_on_activity = false;
|
||||
let mut visited_write_statement = false;
|
||||
let mut prev_inferred_shard = None;
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
let db = self.pool_settings.db.clone();
|
||||
let state = self.database_activity_state(&db);
|
||||
debug!("Database activity state: {:?}", state);
|
||||
|
||||
if let DatabaseActivityState::Initializing = state {
|
||||
debug!("Database is initializing, going to primary");
|
||||
|
||||
self.active_role = Some(Role::Primary);
|
||||
primary_set_based_on_activity = true;
|
||||
}
|
||||
}
|
||||
|
||||
for q in ast {
|
||||
match q {
|
||||
// All transactions go to the primary, probably a write.
|
||||
@@ -495,22 +425,6 @@ impl QueryRouter {
|
||||
|
||||
// Likely a read-only query
|
||||
Query(query) => {
|
||||
if primary_set_based_on_activity {
|
||||
// If we already set the role based on activity, we don't need to do it again
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
// Check if the tables in the query have been written to recently
|
||||
if self.query_handles_tables_in_mutation_cache(query) {
|
||||
debug!("Query handles tables in mutation cache, going to primary");
|
||||
|
||||
self.active_role = Some(Role::Primary);
|
||||
primary_set_based_on_activity = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match &self.pool_settings.automatic_sharding_key {
|
||||
Some(_) => {
|
||||
// TODO: if we have multiple queries in the same message,
|
||||
@@ -541,13 +455,6 @@ impl QueryRouter {
|
||||
|
||||
// Likely a write
|
||||
_ => {
|
||||
debug!("Write statement found, going to primary");
|
||||
|
||||
if self.pool_settings.db_activity_based_routing {
|
||||
// add all of the query tables to the mutation cache
|
||||
self.update_mutation_cache_on_write(q);
|
||||
}
|
||||
|
||||
match &self.pool_settings.automatic_sharding_key {
|
||||
Some(_) => {
|
||||
// TODO: similar to the above, if we have multiple queries in the
|
||||
@@ -590,69 +497,62 @@ impl QueryRouter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn table_mutations_cache(&self) -> Cache<String, bool> {
|
||||
TABLE_MUTATIONS_CACHE
|
||||
.get_or_init(|| {
|
||||
Cache::builder()
|
||||
.time_to_live(Duration::from_millis(
|
||||
self.pool_settings.table_mutation_cache_ms_ttl,
|
||||
))
|
||||
.build()
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
|
||||
let table_mutations_cache = self.table_mutations_cache();
|
||||
debug!("Checking if query handles tables in mutation cache");
|
||||
debug!("Table mutations cache: {:?}", table_mutations_cache);
|
||||
|
||||
for tables in self.table_names(query) {
|
||||
for table in tables {
|
||||
if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
fn extract_exprs_and_table_names<'a>(
|
||||
&'a self,
|
||||
q: &'a Statement,
|
||||
) -> Option<ExtractedExprsAndTables<'a>> {
|
||||
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
|
||||
let mut exprs = Vec::new();
|
||||
|
||||
// Collect all table names from the query.
|
||||
let mut table_names = Vec::new();
|
||||
let mut assignments_opt = None;
|
||||
|
||||
match q {
|
||||
Insert(i) => {
|
||||
Insert {
|
||||
or,
|
||||
into: _,
|
||||
table_name,
|
||||
columns,
|
||||
overwrite: _,
|
||||
source,
|
||||
partitioned,
|
||||
after_columns,
|
||||
table: _,
|
||||
on: _,
|
||||
returning: _,
|
||||
ignore: _,
|
||||
} => {
|
||||
// Not supported in postgres.
|
||||
assert!(i.or.is_none());
|
||||
assert!(i.partitioned.is_none());
|
||||
assert!(i.after_columns.is_empty());
|
||||
assert!(or.is_none());
|
||||
assert!(partitioned.is_none());
|
||||
assert!(after_columns.is_empty());
|
||||
|
||||
Self::process_table(&i.table_name, &mut table_names);
|
||||
if let Some(source) = &i.source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
|
||||
Self::process_table(table_name, &mut table_names);
|
||||
if let Some(source) = source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
}
|
||||
}
|
||||
Delete(d) => {
|
||||
if let Some(expr) = &d.selection {
|
||||
Delete {
|
||||
tables,
|
||||
from,
|
||||
using,
|
||||
selection,
|
||||
returning: _,
|
||||
order_by: _,
|
||||
limit: _,
|
||||
} => {
|
||||
if let Some(expr) = selection {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
|
||||
// Multi-tables delete are not supported in postgres.
|
||||
assert!(d.tables.is_empty());
|
||||
// Multi tables delete are not supported in postgres.
|
||||
assert!(tables.is_empty());
|
||||
|
||||
if let Some(using_tbl_with_join) = &d.using {
|
||||
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
|
||||
if let Some(using_tbl_with_join) = using {
|
||||
Self::process_tables_with_join(
|
||||
using_tbl_with_join,
|
||||
&mut exprs,
|
||||
&mut table_names,
|
||||
);
|
||||
}
|
||||
Self::process_selection(&d.selection, &mut exprs);
|
||||
Self::process_selection(selection, &mut exprs);
|
||||
}
|
||||
Update {
|
||||
table,
|
||||
@@ -666,55 +566,14 @@ impl QueryRouter {
|
||||
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
|
||||
}
|
||||
Self::process_selection(selection, &mut exprs);
|
||||
|
||||
assignments_opt = Some(assignments);
|
||||
}
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
Some(ExtractedExprsAndTables {
|
||||
exprs,
|
||||
table_names,
|
||||
assignments_opt,
|
||||
})
|
||||
}
|
||||
|
||||
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
|
||||
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
|
||||
let exprs = extracted.exprs;
|
||||
let table_names = extracted.table_names;
|
||||
let assignments_opt = extracted.assignments_opt;
|
||||
|
||||
if let Some(assignments) = assignments_opt {
|
||||
self.assignment_parser(assignments)?;
|
||||
}
|
||||
|
||||
Ok(self.infer_shard_from_exprs(exprs, table_names))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn update_mutation_cache_on_write(&self, q: &Statement) {
|
||||
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
|
||||
debug!("Updating mutation cache on write");
|
||||
|
||||
let table_names = extracted.table_names;
|
||||
debug!("Table names in mutation query: {:?}", table_names);
|
||||
let table_mutations_cache = self.table_mutations_cache();
|
||||
for tables in table_names {
|
||||
for table in tables {
|
||||
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
|
||||
}
|
||||
_ => {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// combines the database name and table name into a single string
|
||||
// to be used as the key in the table mutation cache
|
||||
// e.g. "mydb.mytable"
|
||||
fn table_mutation_cache_key(&self, table: Ident) -> String {
|
||||
format!("{}.{}", self.pool_settings.db, table.value)
|
||||
Ok(self.infer_shard_from_exprs(exprs, table_names))
|
||||
}
|
||||
|
||||
fn process_query(
|
||||
@@ -963,13 +822,7 @@ impl QueryRouter {
|
||||
|
||||
for a in assignments {
|
||||
if sharding_key[0].value == "*"
|
||||
&& sharding_key[1].value
|
||||
== a.target
|
||||
.to_string()
|
||||
.split('.')
|
||||
.last()
|
||||
.unwrap()
|
||||
.to_lowercase()
|
||||
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
|
||||
{
|
||||
return Err(Error::QueryRouterParserError(
|
||||
"Sharding key cannot be updated.".into(),
|
||||
@@ -1102,18 +955,6 @@ impl QueryRouter {
|
||||
self.infer_shard_from_exprs(exprs, table_names)
|
||||
}
|
||||
|
||||
/// get table names from query
|
||||
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
|
||||
let mut exprs = Vec::new();
|
||||
|
||||
let mut table_names = Vec::new();
|
||||
Self::process_query(query, &mut exprs, &mut table_names, &None);
|
||||
|
||||
debug!("Table names in query: {:?}", table_names);
|
||||
|
||||
table_names
|
||||
}
|
||||
|
||||
fn infer_shard_from_exprs(
|
||||
&mut self,
|
||||
exprs: Vec<Expr>,
|
||||
@@ -1281,7 +1122,6 @@ mod test {
|
||||
use crate::messages::simple_query;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use bytes::BufMut;
|
||||
use serial_test::serial;
|
||||
|
||||
#[test]
|
||||
fn test_defaults() {
|
||||
@@ -1617,9 +1457,9 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: None,
|
||||
shards: 2,
|
||||
user: crate::config::User::default(),
|
||||
replica_to_primary_failover_enabled: false,
|
||||
default_role: Some(Role::Replica),
|
||||
query_parser_enabled: true,
|
||||
query_parser_max_length: None,
|
||||
@@ -1638,10 +1478,6 @@ mod test {
|
||||
auth_query_password: None,
|
||||
auth_query_user: None,
|
||||
db: "test".to_string(),
|
||||
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
|
||||
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
|
||||
db_activity_ttl: PoolSettings::default().db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
|
||||
plugins: None,
|
||||
};
|
||||
let mut qr = QueryRouter::new();
|
||||
@@ -1700,10 +1536,10 @@ mod test {
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||
checkout_failure_limit: Some(10),
|
||||
shards: 5,
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
replica_to_primary_failover_enabled: false,
|
||||
query_parser_enabled: true,
|
||||
query_parser_max_length: None,
|
||||
query_parser_read_write_splitting: true,
|
||||
@@ -1721,10 +1557,6 @@ mod test {
|
||||
auth_query_password: None,
|
||||
auth_query_user: None,
|
||||
db: "test".to_string(),
|
||||
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
|
||||
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
|
||||
db_activity_ttl: PoolSettings::default().db_activity_ttl,
|
||||
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
|
||||
plugins: None,
|
||||
};
|
||||
|
||||
@@ -2140,150 +1972,4 @@ mod test {
|
||||
|
||||
assert_eq!(res, Ok(PluginOutput::Allow));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_initializing_state() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
let query = simple_query("SELECT * FROM some_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Initially, the database activity should be in the "Initializing" state
|
||||
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
|
||||
assert_eq!(state, DatabaseActivityState::Initializing);
|
||||
|
||||
// Check that the router chooses the primary role due to "Initializing" state
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_active_state() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
let db_name = qr.pool_settings.db.clone();
|
||||
let cache = qr.database_activity_cache();
|
||||
cache.insert(db_name.clone(), DatabaseActivityState::Active);
|
||||
|
||||
let query = simple_query("SELECT * FROM some_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Check that the router can choose a replica role when in "Active" state
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), None); // Default should allow replica due to active state
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_table_mutation_cache_on_write() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.db = "test_table_mutation_cache".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
|
||||
// Simulate the mutation query which should populate the mutation cache
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
|
||||
let cache = qr.table_mutations_cache();
|
||||
|
||||
// Ensure the table mutation cache contains the table with recent write
|
||||
assert!(cache.contains_key(&table_cache_key));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_db_activity_based_routing_multi_query() {
|
||||
use super::*;
|
||||
use crate::messages::simple_query;
|
||||
use tokio::time::Duration;
|
||||
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
|
||||
// Configure the pool settings for db_activity_based_routing
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
qr.pool_settings.db_activity_based_routing = true;
|
||||
qr.pool_settings.db = "test_db_activity_routing".to_string();
|
||||
|
||||
qr.database_activity_cache()
|
||||
.invalidate(&qr.pool_settings.db.clone());
|
||||
|
||||
// First query when database is initializing
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because database is initializing
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Wait for the initialization delay to pass
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
qr.pool_settings.db_activity_init_delay * 2,
|
||||
))
|
||||
.await;
|
||||
|
||||
// Next query after database is active
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to replica because database is active and no recent mutations
|
||||
assert_eq!(qr.role(), None);
|
||||
|
||||
// Simulate a write query to update the mutation cache
|
||||
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because it's a write operation
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Immediately run a read query on the same table
|
||||
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to primary because the table was recently mutated
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
// Wait for the mutation cache TTL to expire
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
|
||||
))
|
||||
.await;
|
||||
|
||||
// Run the read query again after cache expiration
|
||||
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
qr.active_role = None; // Reset the active_role
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
// Should route to replica because mutation cache has expired
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
require_relative "spec_helper"
|
||||
|
||||
describe "Random Load Balancing" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
@@ -8,7 +8,7 @@ describe "Random Load Balancing" do
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context "under regular circumstances" do
|
||||
context("under regular circumstances") do
|
||||
it "balances query volume between all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
@@ -22,14 +22,14 @@ describe "Random Load Balancing" do
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
expect(failed_count).to(eq(0))
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when some replicas are down" do
|
||||
context("when some replicas are down") do
|
||||
it "balances query volume between working instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||
@@ -49,48 +49,13 @@ describe "Random Load Balancing" do
|
||||
processes.all_databases.each do |instance|
|
||||
queries_routed = instance.count_select_1_plus_2
|
||||
if processes.replicas[0..1].include?(instance)
|
||||
expect(queries_routed).to eq(0)
|
||||
expect(queries_routed).to(eq(0))
|
||||
else
|
||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when all replicas are down " do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
|
||||
|
||||
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count = 0
|
||||
number_of_replicas = processes[:replicas].length
|
||||
|
||||
# Take down all replicas
|
||||
processes[:replicas].each(&:take_down)
|
||||
|
||||
(number_of_replicas + 1).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(number_of_replicas + 1)
|
||||
failed_count = 0
|
||||
|
||||
# Ban_time is configured to 60 so this reset will only work
|
||||
# if the replicas are unbanned automatically
|
||||
processes[:replicas].each(&:reset)
|
||||
|
||||
number_of_replicas.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
expect(failed_count).to eq(0)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Least Outstanding Queries Load Balancing" do
|
||||
@@ -100,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context "under homogeneous load" do
|
||||
context("under homogeneous load") do
|
||||
it "balances query volume between all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
@@ -114,15 +79,15 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
expect(failed_count).to(eq(0))
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "under heterogeneous load" do
|
||||
xit "balances query volume between all instances based on how busy they are" do
|
||||
context("under heterogeneous load") do
|
||||
xit("balances query volume between all instances based on how busy they are") do
|
||||
slow_query_count = 2
|
||||
threads = Array.new(slow_query_count) do
|
||||
Thread.new do
|
||||
@@ -143,31 +108,32 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
expect(failed_count).to(eq(0))
|
||||
# Under LOQ, we expect replicas running the slow pg_sleep
|
||||
# to get no selects
|
||||
expect(
|
||||
processes.
|
||||
all_databases.
|
||||
map(&:count_select_1_plus_2).
|
||||
count { |instance_share| instance_share == 0 }
|
||||
).to eq(slow_query_count)
|
||||
processes
|
||||
.all_databases
|
||||
.map(&:count_select_1_plus_2)
|
||||
.count { |instance_share| instance_share == 0 }
|
||||
)
|
||||
.to(eq(slow_query_count))
|
||||
|
||||
# We also expect the quick queries to be spread across
|
||||
# the idle servers only
|
||||
processes.
|
||||
all_databases.
|
||||
map(&:count_select_1_plus_2).
|
||||
reject { |instance_share| instance_share == 0 }.
|
||||
each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
processes
|
||||
.all_databases
|
||||
.map(&:count_select_1_plus_2)
|
||||
.reject { |instance_share| instance_share == 0 }
|
||||
.each do |instance_share|
|
||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||
end
|
||||
|
||||
threads.map(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when some replicas are down" do
|
||||
context("when some replicas are down") do
|
||||
it "balances query volume between working instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||
@@ -184,13 +150,104 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
end
|
||||
end
|
||||
|
||||
expect(failed_count).to be <= 2
|
||||
expect(failed_count).to(be <= 2)
|
||||
processes.all_databases.each do |instance|
|
||||
queries_routed = instance.count_select_1_plus_2
|
||||
if processes.replicas[0..1].include?(instance)
|
||||
expect(queries_routed).to eq(0)
|
||||
expect(queries_routed).to(eq(0))
|
||||
else
|
||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Candidate filtering based on `default_pool`" do
|
||||
let(:processes) {
|
||||
Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings)
|
||||
}
|
||||
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context("with default_pool set to replicas") do
|
||||
context("when all replicas are down ") do
|
||||
let(:pool_settings) do
|
||||
{
|
||||
"default_role" => "replica",
|
||||
"replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled
|
||||
}
|
||||
end
|
||||
|
||||
context("with `replica_to_primary_failover_enabled` set to false`") do
|
||||
let(:replica_to_primary_failover_enabled) { false }
|
||||
|
||||
it(
|
||||
"unbans them automatically to prevent false positives in health checks that could make all replicas unavailable"
|
||||
) do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count = 0
|
||||
number_of_replicas = processes[:replicas].length
|
||||
|
||||
# Take down all replicas
|
||||
processes[:replicas].each(&:take_down)
|
||||
|
||||
(number_of_replicas + 1).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to(eq(number_of_replicas + 1))
|
||||
failed_count = 0
|
||||
|
||||
# Ban_time is configured to 60 so this reset will only work
|
||||
# if the replicas are unbanned automatically
|
||||
processes[:replicas].each(&:reset)
|
||||
|
||||
number_of_replicas.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to(eq(0))
|
||||
end
|
||||
end
|
||||
|
||||
context("with `replica_to_primary_failover_enabled` set to true`") do
|
||||
let(:replica_to_primary_failover_enabled) { true }
|
||||
|
||||
it "does not unbans them automatically" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count = 0
|
||||
number_of_replicas = processes[:replicas].length
|
||||
|
||||
# We need to allow pgcat to open connections to replicas
|
||||
(number_of_replicas + 10).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
expect(failed_count).to(eq(0))
|
||||
|
||||
# Take down all replicas
|
||||
processes[:replicas].each(&:take_down)
|
||||
|
||||
(number_of_replicas + 10).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to(eq(number_of_replicas))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -188,102 +188,6 @@ describe "Miscellaneous" do
|
||||
end
|
||||
end
|
||||
|
||||
describe "Checkout failure limit" do
|
||||
context "when no checkout failure limit is set" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set high" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "does not disconnect client" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(0.5);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
context "when checkout failure limit is set low" do
|
||||
before do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 200
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
it "disconnects client after reaching limit" do
|
||||
Array.new(5) do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
checkout_failure_count = 0
|
||||
for i in 0..4
|
||||
begin
|
||||
conn.async_exec("SELECT pg_sleep(1);")
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::SystemError
|
||||
checkout_failure_count += 1
|
||||
expect(conn.status).to eq(PG::CONNECTION_OK)
|
||||
rescue PG::ConnectionBad
|
||||
expect(checkout_failure_count).to eq(2)
|
||||
expect(conn.status).to eq(PG::CONNECTION_BAD)
|
||||
break
|
||||
end
|
||||
end
|
||||
conn.close
|
||||
end
|
||||
end.each(&:join)
|
||||
puts processes.pgcat.logs
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Server version reporting" do
|
||||
it "reports correct version for normal and admin databases" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
Reference in New Issue
Block a user