mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 18:06:29 +00:00
Compare commits
3 Commits
dependabot
...
levkk-asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd3623ff13 | ||
|
|
088f1a7dae | ||
|
|
ab7ac16974 |
@@ -110,6 +110,10 @@ python3 tests/python/tests.py || exit 1
|
|||||||
|
|
||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
|
python3 tests/python/async_test.py
|
||||||
|
|
||||||
|
start_pgcat "info"
|
||||||
|
|
||||||
# Admin tests
|
# Admin tests
|
||||||
export PGPASSWORD=admin_pass
|
export PGPASSWORD=admin_pass
|
||||||
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||||
|
|||||||
24
CONFIG.md
24
CONFIG.md
@@ -49,14 +49,6 @@ default: 30000 # milliseconds
|
|||||||
|
|
||||||
How long an idle connection with a server is left open (ms).
|
How long an idle connection with a server is left open (ms).
|
||||||
|
|
||||||
### server_lifetime
|
|
||||||
```
|
|
||||||
path: general.server_lifetime
|
|
||||||
default: 86400000 # 24 hours
|
|
||||||
```
|
|
||||||
|
|
||||||
Max connection lifetime before it's closed, even if actively used.
|
|
||||||
|
|
||||||
### idle_client_in_transaction_timeout
|
### idle_client_in_transaction_timeout
|
||||||
```
|
```
|
||||||
path: general.idle_client_in_transaction_timeout
|
path: general.idle_client_in_transaction_timeout
|
||||||
@@ -188,22 +180,6 @@ default: "admin_pass"
|
|||||||
|
|
||||||
Password to access the virtual administrative database
|
Password to access the virtual administrative database
|
||||||
|
|
||||||
### dns_cache_enabled
|
|
||||||
```
|
|
||||||
path: general.dns_cache_enabled
|
|
||||||
default: false
|
|
||||||
```
|
|
||||||
When enabled, ip resolutions for server connections specified using hostnames will be cached
|
|
||||||
and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
|
|
||||||
old ip connections are closed (gracefully) and new connections will start using new ip.
|
|
||||||
|
|
||||||
### dns_max_ttl
|
|
||||||
```
|
|
||||||
path: general.dns_max_ttl
|
|
||||||
default: 30
|
|
||||||
```
|
|
||||||
Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
|
||||||
|
|
||||||
## `pools.<pool_name>` Section
|
## `pools.<pool_name>` Section
|
||||||
|
|
||||||
### pool_mode
|
### pool_mode
|
||||||
|
|||||||
499
Cargo.lock
generated
499
Cargo.lock
generated
@@ -26,27 +26,6 @@ version = "1.6.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream-impl",
|
|
||||||
"futures-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream-impl"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.68"
|
version = "0.1.68"
|
||||||
@@ -233,12 +212,6 @@ dependencies = [
|
|||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "data-encoding"
|
|
||||||
version = "2.3.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "digest"
|
name = "digest"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
@@ -250,24 +223,6 @@ dependencies = [
|
|||||||
"subtle",
|
"subtle",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "either"
|
|
||||||
version = "1.8.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "enum-as-inner"
|
|
||||||
version = "0.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116"
|
|
||||||
dependencies = [
|
|
||||||
"heck",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "env_logger"
|
name = "env_logger"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
@@ -320,15 +275,6 @@ version = "1.0.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "form_urlencoded"
|
|
||||||
version = "1.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
|
|
||||||
dependencies = [
|
|
||||||
"percent-encoding",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures"
|
name = "futures"
|
||||||
version = "0.3.28"
|
version = "0.3.28"
|
||||||
@@ -464,12 +410,6 @@ version = "0.12.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "heck"
|
|
||||||
version = "0.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
@@ -494,17 +434,6 @@ dependencies = [
|
|||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hostname"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"match_cfg",
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
@@ -562,7 +491,7 @@ dependencies = [
|
|||||||
"httpdate",
|
"httpdate",
|
||||||
"itoa",
|
"itoa",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2 0.4.9",
|
"socket2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -593,27 +522,6 @@ dependencies = [
|
|||||||
"cxx-build",
|
"cxx-build",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "idna"
|
|
||||||
version = "0.2.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
|
|
||||||
dependencies = [
|
|
||||||
"matches",
|
|
||||||
"unicode-bidi",
|
|
||||||
"unicode-normalization",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "idna"
|
|
||||||
version = "0.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
|
|
||||||
dependencies = [
|
|
||||||
"unicode-bidi",
|
|
||||||
"unicode-normalization",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "1.9.2"
|
version = "1.9.2"
|
||||||
@@ -631,27 +539,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
|
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ipconfig"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "bd302af1b90f2463a98fa5ad469fc212c8e3175a41c3068601bfa2727591c5be"
|
|
||||||
dependencies = [
|
|
||||||
"socket2 0.4.9",
|
|
||||||
"widestring",
|
|
||||||
"winapi",
|
|
||||||
"winreg",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ipnet"
|
|
||||||
version = "2.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "is-terminal"
|
name = "is-terminal"
|
||||||
version = "0.4.4"
|
version = "0.4.4"
|
||||||
@@ -661,16 +551,7 @@ dependencies = [
|
|||||||
"hermit-abi 0.3.1",
|
"hermit-abi 0.3.1",
|
||||||
"io-lifetimes",
|
"io-lifetimes",
|
||||||
"rustix",
|
"rustix",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "itertools"
|
|
||||||
version = "0.10.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
|
|
||||||
dependencies = [
|
|
||||||
"either",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -708,17 +589,11 @@ dependencies = [
|
|||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "lazy_static"
|
|
||||||
version = "1.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.144"
|
version = "0.2.139"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "link-cplusplus"
|
name = "link-cplusplus"
|
||||||
@@ -729,12 +604,6 @@ dependencies = [
|
|||||||
"cc",
|
"cc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "linked-hash-map"
|
|
||||||
version = "0.5.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "linux-raw-sys"
|
name = "linux-raw-sys"
|
||||||
version = "0.1.4"
|
version = "0.1.4"
|
||||||
@@ -760,27 +629,6 @@ dependencies = [
|
|||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "lru-cache"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
|
|
||||||
dependencies = [
|
|
||||||
"linked-hash-map",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "match_cfg"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "matches"
|
|
||||||
version = "0.1.9"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md-5"
|
name = "md-5"
|
||||||
version = "0.10.5"
|
version = "0.10.5"
|
||||||
@@ -814,7 +662,7 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -886,18 +734,12 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"redox_syscall",
|
"redox_syscall",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "percent-encoding"
|
|
||||||
version = "2.2.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha3"
|
version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -912,7 +754,6 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"hmac",
|
"hmac",
|
||||||
"hyper",
|
"hyper",
|
||||||
"itertools",
|
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
@@ -921,26 +762,20 @@ dependencies = [
|
|||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"phf",
|
"phf",
|
||||||
"pin-project",
|
|
||||||
"postgres-protocol",
|
"postgres-protocol",
|
||||||
"rand",
|
"rand",
|
||||||
"regex",
|
"regex",
|
||||||
"rustls",
|
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
|
||||||
"sha-1",
|
"sha-1",
|
||||||
"sha2",
|
"sha2",
|
||||||
"socket2 0.5.3",
|
"socket2",
|
||||||
"sqlparser",
|
"sqlparser",
|
||||||
"stringprep",
|
"stringprep",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"tokio-test",
|
|
||||||
"toml",
|
"toml",
|
||||||
"trust-dns-resolver",
|
|
||||||
"webpki-roots",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -985,26 +820,6 @@ dependencies = [
|
|||||||
"siphasher",
|
"siphasher",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "pin-project"
|
|
||||||
version = "1.0.12"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
|
|
||||||
dependencies = [
|
|
||||||
"pin-project-internal",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "pin-project-internal"
|
|
||||||
version = "1.0.12"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
@@ -1050,12 +865,6 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "quick-error"
|
|
||||||
version = "1.2.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.26"
|
version = "1.0.26"
|
||||||
@@ -1106,9 +915,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.8.1"
|
version = "1.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
|
checksum = "ac6cf59af1067a3fb53fbe5c88c053764e930f932be1d71d3ffe032cbe147f59"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
@@ -1117,19 +926,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-syntax"
|
name = "regex-syntax"
|
||||||
version = "0.7.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "resolv-conf"
|
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00"
|
checksum = "b6868896879ba532248f33598de5181522d8b3d9d724dfd230911e1a7d4822f5"
|
||||||
dependencies = [
|
|
||||||
"hostname",
|
|
||||||
"quick-error",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ring"
|
name = "ring"
|
||||||
@@ -1157,14 +956,14 @@ dependencies = [
|
|||||||
"io-lifetimes",
|
"io-lifetimes",
|
||||||
"libc",
|
"libc",
|
||||||
"linux-raw-sys",
|
"linux-raw-sys",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls"
|
name = "rustls"
|
||||||
version = "0.21.1"
|
version = "0.21.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e"
|
checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"ring",
|
"ring",
|
||||||
@@ -1191,12 +990,6 @@ dependencies = [
|
|||||||
"untrusted",
|
"untrusted",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ryu"
|
|
||||||
version = "1.0.13"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@@ -1224,9 +1017,6 @@ name = "serde"
|
|||||||
version = "1.0.160"
|
version = "1.0.160"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
||||||
dependencies = [
|
|
||||||
"serde_derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
@@ -1239,17 +1029,6 @@ dependencies = [
|
|||||||
"syn 2.0.9",
|
"syn 2.0.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_json"
|
|
||||||
version = "1.0.96"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
|
|
||||||
dependencies = [
|
|
||||||
"itoa",
|
|
||||||
"ryu",
|
|
||||||
"serde",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_spanned"
|
name = "serde_spanned"
|
||||||
version = "0.6.1"
|
version = "0.6.1"
|
||||||
@@ -1313,24 +1092,14 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.4.9"
|
version = "0.4.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
|
checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "socket2"
|
|
||||||
version = "0.5.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"windows-sys 0.48.0",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "spin"
|
name = "spin"
|
||||||
version = "0.5.2"
|
version = "0.5.2"
|
||||||
@@ -1344,18 +1113,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
|
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "sqlparser_derive"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1411,26 +1168,6 @@ dependencies = [
|
|||||||
"winapi-util",
|
"winapi-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror"
|
|
||||||
version = "1.0.37"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
|
|
||||||
dependencies = [
|
|
||||||
"thiserror-impl",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror-impl"
|
|
||||||
version = "1.0.37"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.1.45"
|
version = "0.1.45"
|
||||||
@@ -1472,9 +1209,9 @@ dependencies = [
|
|||||||
"parking_lot",
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2 0.4.9",
|
"socket2",
|
||||||
"tokio-macros",
|
"tokio-macros",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1498,30 +1235,6 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-stream"
|
|
||||||
version = "0.1.11"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
|
|
||||||
dependencies = [
|
|
||||||
"futures-core",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-test"
|
|
||||||
version = "0.4.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream",
|
|
||||||
"bytes",
|
|
||||||
"futures-core",
|
|
||||||
"tokio",
|
|
||||||
"tokio-stream",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.7.7"
|
version = "0.7.7"
|
||||||
@@ -1584,21 +1297,9 @@ checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-attributes"
|
|
||||||
version = "0.1.23"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-core"
|
name = "tracing-core"
|
||||||
version = "0.1.30"
|
version = "0.1.30"
|
||||||
@@ -1608,51 +1309,6 @@ dependencies = [
|
|||||||
"once_cell",
|
"once_cell",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "trust-dns-proto"
|
|
||||||
version = "0.22.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"cfg-if",
|
|
||||||
"data-encoding",
|
|
||||||
"enum-as-inner",
|
|
||||||
"futures-channel",
|
|
||||||
"futures-io",
|
|
||||||
"futures-util",
|
|
||||||
"idna 0.2.3",
|
|
||||||
"ipnet",
|
|
||||||
"lazy_static",
|
|
||||||
"rand",
|
|
||||||
"smallvec",
|
|
||||||
"thiserror",
|
|
||||||
"tinyvec",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"url",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "trust-dns-resolver"
|
|
||||||
version = "0.22.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"futures-util",
|
|
||||||
"ipconfig",
|
|
||||||
"lazy_static",
|
|
||||||
"lru-cache",
|
|
||||||
"parking_lot",
|
|
||||||
"resolv-conf",
|
|
||||||
"smallvec",
|
|
||||||
"thiserror",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"trust-dns-proto",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "try-lock"
|
name = "try-lock"
|
||||||
version = "0.2.4"
|
version = "0.2.4"
|
||||||
@@ -1698,17 +1354,6 @@ version = "0.7.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "url"
|
|
||||||
version = "2.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
|
|
||||||
dependencies = [
|
|
||||||
"form_urlencoded",
|
|
||||||
"idna 0.3.0",
|
|
||||||
"percent-encoding",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.4"
|
version = "0.9.4"
|
||||||
@@ -1801,21 +1446,6 @@ dependencies = [
|
|||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "webpki-roots"
|
|
||||||
version = "0.23.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "aa54963694b65584e170cf5dc46aeb4dcaa5584e652ff5f3952e56d66aff0125"
|
|
||||||
dependencies = [
|
|
||||||
"rustls-webpki",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "widestring"
|
|
||||||
version = "0.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "17882f045410753661207383517a6f62ec3dbeb6a4ed2acce01f0728238d1983"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi"
|
name = "winapi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
@@ -1853,16 +1483,7 @@ version = "0.45.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows-targets 0.42.1",
|
"windows-targets",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-sys"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
|
||||||
dependencies = [
|
|
||||||
"windows-targets 0.48.0",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1871,28 +1492,13 @@ version = "0.42.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
|
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows_aarch64_gnullvm 0.42.1",
|
"windows_aarch64_gnullvm",
|
||||||
"windows_aarch64_msvc 0.42.1",
|
"windows_aarch64_msvc",
|
||||||
"windows_i686_gnu 0.42.1",
|
"windows_i686_gnu",
|
||||||
"windows_i686_msvc 0.42.1",
|
"windows_i686_msvc",
|
||||||
"windows_x86_64_gnu 0.42.1",
|
"windows_x86_64_gnu",
|
||||||
"windows_x86_64_gnullvm 0.42.1",
|
"windows_x86_64_gnullvm",
|
||||||
"windows_x86_64_msvc 0.42.1",
|
"windows_x86_64_msvc",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-targets"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
|
|
||||||
dependencies = [
|
|
||||||
"windows_aarch64_gnullvm 0.48.0",
|
|
||||||
"windows_aarch64_msvc 0.48.0",
|
|
||||||
"windows_i686_gnu 0.48.0",
|
|
||||||
"windows_i686_msvc 0.48.0",
|
|
||||||
"windows_x86_64_gnu 0.48.0",
|
|
||||||
"windows_x86_64_gnullvm 0.48.0",
|
|
||||||
"windows_x86_64_msvc 0.48.0",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1901,84 +1507,42 @@ version = "0.42.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
|
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_gnullvm"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_aarch64_msvc"
|
name = "windows_aarch64_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
|
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_i686_gnu"
|
name = "windows_i686_gnu"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
|
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_gnu"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_i686_msvc"
|
name = "windows_i686_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
|
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_gnu"
|
name = "windows_x86_64_gnu"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
|
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnu"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_gnullvm"
|
name = "windows_x86_64_gnullvm"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
|
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnullvm"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_msvc"
|
name = "windows_x86_64_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winnow"
|
name = "winnow"
|
||||||
version = "0.3.3"
|
version = "0.3.3"
|
||||||
@@ -1987,12 +1551,3 @@ checksum = "faf09497b8f8b5ac5d3bb4d05c0a99be20f26fd3d5f2db7b0716e946d5103658"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "winreg"
|
|
||||||
version = "0.10.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
|
|
||||||
dependencies = [
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|||||||
16
Cargo.toml
16
Cargo.toml
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha3"
|
version = "1.0.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@@ -14,12 +14,12 @@ rand = "0.8"
|
|||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
sha-1 = "0.10"
|
sha-1 = "0.10"
|
||||||
toml = "0.7"
|
toml = "0.7"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = "1"
|
||||||
serde_derive = "1"
|
serde_derive = "1"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = {version = "0.33", features = ["visitor"] }
|
sqlparser = "0.33.0"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
env_logger = "0.10"
|
env_logger = "0.10"
|
||||||
@@ -34,19 +34,11 @@ hyper = { version = "0.14", features = ["full"] }
|
|||||||
phf = { version = "0.11.1", features = ["macros"] }
|
phf = { version = "0.11.1", features = ["macros"] }
|
||||||
exitcode = "1.1.2"
|
exitcode = "1.1.2"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
socket2 = { version = "0.5.3", features = ["all"] }
|
socket2 = { version = "0.4.7", features = ["all"] }
|
||||||
nix = "0.26.2"
|
nix = "0.26.2"
|
||||||
atomic_enum = "0.2.0"
|
atomic_enum = "0.2.0"
|
||||||
postgres-protocol = "0.6.5"
|
postgres-protocol = "0.6.5"
|
||||||
fallible-iterator = "0.2"
|
fallible-iterator = "0.2"
|
||||||
pin-project = "1"
|
|
||||||
webpki-roots = "0.23"
|
|
||||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
|
||||||
trust-dns-resolver = "0.22.0"
|
|
||||||
tokio-test = "0.4.2"
|
|
||||||
serde_json = "1"
|
|
||||||
itertools = "0.10"
|
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
jemallocator = "0.5.0"
|
jemallocator = "0.5.0"
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load bal
|
|||||||
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
|
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
|
||||||
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
|
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
|
||||||
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
|
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
|
||||||
| SSL/TLS | **Stable** | Clients can connect to the pooler using TLS. Pooler can connect to Postgres servers using TLS. |
|
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
|
||||||
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
||||||
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
||||||
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
|
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ x-common-env-pg:
|
|||||||
|
|
||||||
services:
|
services:
|
||||||
main:
|
main:
|
||||||
image: gcr.io/google_containers/pause:3.2
|
image: kubernetes/pause
|
||||||
ports:
|
ports:
|
||||||
- 6432
|
- 6432
|
||||||
|
|
||||||
@@ -64,7 +64,7 @@ services:
|
|||||||
<<: *common-env-pg
|
<<: *common-env-pg
|
||||||
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
||||||
PGPORT: 10432
|
PGPORT: 10432
|
||||||
command: ["postgres", "-p", "10432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
||||||
|
|
||||||
toxiproxy:
|
toxiproxy:
|
||||||
build: .
|
build: .
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
# This is an example of the most basic config
|
|
||||||
# that will mimic what PgBouncer does in transaction mode with one server.
|
|
||||||
|
|
||||||
[general]
|
|
||||||
|
|
||||||
host = "0.0.0.0"
|
|
||||||
port = 6433
|
|
||||||
admin_username = "pgcat"
|
|
||||||
admin_password = "pgcat"
|
|
||||||
|
|
||||||
[pools.pgml.users.0]
|
|
||||||
username = "postgres"
|
|
||||||
password = "postgres"
|
|
||||||
pool_size = 10
|
|
||||||
min_pool_size = 1
|
|
||||||
pool_mode = "transaction"
|
|
||||||
|
|
||||||
[pools.pgml.shards.0]
|
|
||||||
servers = [
|
|
||||||
["127.0.0.1", 28815, "primary"]
|
|
||||||
]
|
|
||||||
database = "postgres"
|
|
||||||
122
pgcat.toml
122
pgcat.toml
@@ -23,9 +23,6 @@ connect_timeout = 5000 # milliseconds
|
|||||||
# How long an idle connection with a server is left open (ms).
|
# How long an idle connection with a server is left open (ms).
|
||||||
idle_timeout = 30000 # milliseconds
|
idle_timeout = 30000 # milliseconds
|
||||||
|
|
||||||
# Max connection lifetime before it's closed, even if actively used.
|
|
||||||
server_lifetime = 86400000 # 24 hours
|
|
||||||
|
|
||||||
# How long a client is allowed to be idle while in a transaction (ms).
|
# How long a client is allowed to be idle while in a transaction (ms).
|
||||||
idle_client_in_transaction_timeout = 0 # milliseconds
|
idle_client_in_transaction_timeout = 0 # milliseconds
|
||||||
|
|
||||||
@@ -61,15 +58,9 @@ tcp_keepalives_count = 5
|
|||||||
tcp_keepalives_interval = 5
|
tcp_keepalives_interval = 5
|
||||||
|
|
||||||
# Path to TLS Certificate file to use for TLS connections
|
# Path to TLS Certificate file to use for TLS connections
|
||||||
# tls_certificate = ".circleci/server.cert"
|
# tls_certificate = "server.cert"
|
||||||
# Path to TLS private key file to use for TLS connections
|
# Path to TLS private key file to use for TLS connections
|
||||||
# tls_private_key = ".circleci/server.key"
|
# tls_private_key = "server.key"
|
||||||
|
|
||||||
# Enable/disable server TLS
|
|
||||||
server_tls = false
|
|
||||||
|
|
||||||
# Verify server certificate is completely authentic.
|
|
||||||
verify_server_certificate = false
|
|
||||||
|
|
||||||
# User name to access the virtual administrative database (pgbouncer or pgcat)
|
# User name to access the virtual administrative database (pgbouncer or pgcat)
|
||||||
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
|
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
|
||||||
@@ -77,58 +68,6 @@ admin_username = "admin_user"
|
|||||||
# Password to access the virtual administrative database
|
# Password to access the virtual administrative database
|
||||||
admin_password = "admin_pass"
|
admin_password = "admin_pass"
|
||||||
|
|
||||||
# Default plugins that are configured on all pools.
|
|
||||||
[plugins]
|
|
||||||
|
|
||||||
# Prewarmer plugin that runs queries on server startup, before giving the connection
|
|
||||||
# to the client.
|
|
||||||
[plugins.prewarmer]
|
|
||||||
enabled = false
|
|
||||||
queries = [
|
|
||||||
"SELECT pg_prewarm('pgbench_accounts')",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Log all queries to stdout.
|
|
||||||
[plugins.query_logger]
|
|
||||||
enabled = false
|
|
||||||
|
|
||||||
# Block access to tables that Postgres does not allow us to control.
|
|
||||||
[plugins.table_access]
|
|
||||||
enabled = false
|
|
||||||
tables = [
|
|
||||||
"pg_user",
|
|
||||||
"pg_roles",
|
|
||||||
"pg_database",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Intercept user queries and give a fake reply.
|
|
||||||
[plugins.intercept]
|
|
||||||
enabled = true
|
|
||||||
|
|
||||||
[plugins.intercept.queries.0]
|
|
||||||
|
|
||||||
query = "select current_database() as a, current_schemas(false) as b"
|
|
||||||
schema = [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
[plugins.intercept.queries.1]
|
|
||||||
|
|
||||||
query = "select current_database(), current_schema(), current_user"
|
|
||||||
schema = [
|
|
||||||
["current_database", "text"],
|
|
||||||
["current_schema", "text"],
|
|
||||||
["current_user", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "public", "${USER}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
# pool configs are structured as pool.<pool_name>
|
# pool configs are structured as pool.<pool_name>
|
||||||
# the pool_name is what clients use as database name when connecting.
|
# the pool_name is what clients use as database name when connecting.
|
||||||
# For a pool named `sharded_db`, clients access that pool using connection string like
|
# For a pool named `sharded_db`, clients access that pool using connection string like
|
||||||
@@ -198,61 +137,6 @@ idle_timeout = 40000
|
|||||||
# Connect timeout can be overwritten in the pool
|
# Connect timeout can be overwritten in the pool
|
||||||
connect_timeout = 3000
|
connect_timeout = 3000
|
||||||
|
|
||||||
# When enabled, ip resolutions for server connections specified using hostnames will be cached
|
|
||||||
# and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
|
|
||||||
# old ip connections are closed (gracefully) and new connections will start using new ip.
|
|
||||||
# dns_cache_enabled = false
|
|
||||||
|
|
||||||
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
|
||||||
# dns_max_ttl = 30
|
|
||||||
|
|
||||||
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
|
|
||||||
# so all plugins have to be configured here again.
|
|
||||||
[pool.sharded_db.plugins]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.prewarmer]
|
|
||||||
enabled = true
|
|
||||||
queries = [
|
|
||||||
"SELECT pg_prewarm('pgbench_accounts')",
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.query_logger]
|
|
||||||
enabled = false
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.table_access]
|
|
||||||
enabled = false
|
|
||||||
tables = [
|
|
||||||
"pg_user",
|
|
||||||
"pg_roles",
|
|
||||||
"pg_database",
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept]
|
|
||||||
enabled = true
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept.queries.0]
|
|
||||||
|
|
||||||
query = "select current_database() as a, current_schemas(false) as b"
|
|
||||||
schema = [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept.queries.1]
|
|
||||||
|
|
||||||
query = "select current_database(), current_schema(), current_user"
|
|
||||||
schema = [
|
|
||||||
["current_database", "text"],
|
|
||||||
["current_schema", "text"],
|
|
||||||
["current_user", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "public", "${USER}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
# User configs are structured as pool.<pool_name>.users.<user_index>
|
# User configs are structured as pool.<pool_name>.users.<user_index>
|
||||||
# This section holds the credentials for users that may connect to this cluster
|
# This section holds the credentials for users that may connect to this cluster
|
||||||
[pools.sharded_db.users.0]
|
[pools.sharded_db.users.0]
|
||||||
@@ -322,8 +206,6 @@ sharding_function = "pg_bigint_hash"
|
|||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
password = "simple_user"
|
password = "simple_user"
|
||||||
pool_size = 5
|
pool_size = 5
|
||||||
min_pool_size = 3
|
|
||||||
server_lifetime = 60000
|
|
||||||
statement_timeout = 0
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.simple_db.shards.0]
|
[pools.simple_db.shards.0]
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ use tokio::time::Instant;
|
|||||||
use crate::config::{get_config, reload_config, VERSION};
|
use crate::config::{get_config, reload_config, VERSION};
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::pool::ClientServerMap;
|
|
||||||
use crate::pool::{get_all_pools, get_pool};
|
use crate::pool::{get_all_pools, get_pool};
|
||||||
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
|
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
|
||||||
|
use crate::ClientServerMap;
|
||||||
|
|
||||||
pub fn generate_server_info_for_admin() -> BytesMut {
|
pub fn generate_server_info_for_admin() -> BytesMut {
|
||||||
let mut server_info = BytesMut::new();
|
let mut server_info = BytesMut::new();
|
||||||
|
|||||||
@@ -77,8 +77,6 @@ impl AuthPassthrough {
|
|||||||
pool_size: 1,
|
pool_size: 1,
|
||||||
statement_timeout: 0,
|
statement_timeout: 0,
|
||||||
pool_mode: None,
|
pool_mode: None,
|
||||||
server_lifetime: None,
|
|
||||||
min_pool_size: None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let user = &address.username;
|
let user = &address.username;
|
||||||
|
|||||||
152
src/client.rs
152
src/client.rs
@@ -16,7 +16,6 @@ use crate::auth_passthrough::refetch_auth_hash;
|
|||||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
|
||||||
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
||||||
use crate::query_router::{Command, QueryRouter};
|
use crate::query_router::{Command, QueryRouter};
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -540,7 +539,6 @@ where
|
|||||||
Some(md5_hash_password(username, password, &salt))
|
Some(md5_hash_password(username, password, &salt))
|
||||||
} else {
|
} else {
|
||||||
if !get_config().is_auth_query_configured() {
|
if !get_config().is_auth_query_configured() {
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
return Err(Error::ClientAuthImpossible(username.into()));
|
return Err(Error::ClientAuthImpossible(username.into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -567,8 +565,6 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(Error::ClientAuthPassthroughError(
|
return Err(Error::ClientAuthPassthroughError(
|
||||||
err.to_string(),
|
err.to_string(),
|
||||||
client_identifier,
|
client_identifier,
|
||||||
@@ -591,15 +587,7 @@ where
|
|||||||
client_identifier
|
client_identifier
|
||||||
);
|
);
|
||||||
|
|
||||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
let fetched_hash = refetch_auth_hash(&pool).await?;
|
||||||
Ok(fetched_hash) => fetched_hash,
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||||
|
|
||||||
// Ok password changed in server an auth is possible.
|
// Ok password changed in server an auth is possible.
|
||||||
@@ -766,9 +754,6 @@ where
|
|||||||
|
|
||||||
self.stats.register(self.stats.clone());
|
self.stats.register(self.stats.clone());
|
||||||
|
|
||||||
// Result returned by one of the plugins.
|
|
||||||
let mut plugin_output = None;
|
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
// or issue commands for our sharding and server selection protocol.
|
// or issue commands for our sharding and server selection protocol.
|
||||||
@@ -819,25 +804,7 @@ where
|
|||||||
|
|
||||||
'Q' => {
|
'Q' => {
|
||||||
if query_router.query_parser_enabled() {
|
if query_router.query_parser_enabled() {
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
query_router.infer(&message);
|
||||||
let plugin_result = query_router.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
match plugin_result {
|
|
||||||
Ok(PluginOutput::Deny(error)) => {
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -845,13 +812,7 @@ where
|
|||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
|
|
||||||
if query_router.query_parser_enabled() {
|
if query_router.query_parser_enabled() {
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
query_router.infer(&message);
|
||||||
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
||||||
plugin_output = Some(output);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
@@ -885,18 +846,6 @@ where
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check on plugin results.
|
|
||||||
match plugin_output {
|
|
||||||
Some(PluginOutput::Deny(error)) => {
|
|
||||||
self.buffer.clear();
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
plugin_output = None;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Get a pool instance referenced by the most up-to-date
|
// Get a pool instance referenced by the most up-to-date
|
||||||
// pointer. This ensures we always read the latest config
|
// pointer. This ensures we always read the latest config
|
||||||
// when starting a query.
|
// when starting a query.
|
||||||
@@ -983,7 +932,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Grab a server from the pool.
|
// Grab a server from the pool.
|
||||||
let connection = match pool
|
let mut connection = match pool
|
||||||
.get(query_router.shard(), query_router.role(), &self.stats)
|
.get(query_router.shard(), query_router.role(), &self.stats)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -1026,9 +975,8 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut reference = connection.0;
|
let server = &mut *connection.0;
|
||||||
let address = connection.1;
|
let address = connection.1;
|
||||||
let server = &mut *reference;
|
|
||||||
|
|
||||||
// Server is assigned to the client in case the client wants to
|
// Server is assigned to the client in case the client wants to
|
||||||
// cancel a query later.
|
// cancel a query later.
|
||||||
@@ -1051,6 +999,7 @@ where
|
|||||||
|
|
||||||
// Set application_name.
|
// Set application_name.
|
||||||
server.set_name(&self.application_name).await?;
|
server.set_name(&self.application_name).await?;
|
||||||
|
server.switch_async(false);
|
||||||
|
|
||||||
let mut initial_message = Some(message);
|
let mut initial_message = Some(message);
|
||||||
|
|
||||||
@@ -1070,12 +1019,37 @@ where
|
|||||||
None => {
|
None => {
|
||||||
trace!("Waiting for message inside transaction or in session mode");
|
trace!("Waiting for message inside transaction or in session mode");
|
||||||
|
|
||||||
match tokio::time::timeout(
|
let message = tokio::select! {
|
||||||
idle_client_timeout_duration,
|
message = tokio::time::timeout(
|
||||||
read_message(&mut self.read),
|
idle_client_timeout_duration,
|
||||||
)
|
read_message(&mut self.read),
|
||||||
.await
|
) => message,
|
||||||
{
|
|
||||||
|
server_message = server.recv() => {
|
||||||
|
debug!("Got async message");
|
||||||
|
|
||||||
|
let server_message = match server_message {
|
||||||
|
Ok(message) => message,
|
||||||
|
Err(err) => {
|
||||||
|
pool.ban(&address, BanReason::MessageReceiveFailed, Some(&self.stats));
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match write_all_half(&mut self.write, &server_message).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match message {
|
||||||
Ok(Ok(message)) => message,
|
Ok(Ok(message)) => message,
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
// Client disconnected inside a transaction.
|
// Client disconnected inside a transaction.
|
||||||
@@ -1125,27 +1099,6 @@ where
|
|||||||
match code {
|
match code {
|
||||||
// Query
|
// Query
|
||||||
'Q' => {
|
'Q' => {
|
||||||
if query_router.query_parser_enabled() {
|
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
|
||||||
let plugin_result = query_router.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
match plugin_result {
|
|
||||||
Ok(PluginOutput::Deny(error)) => {
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
self.send_and_receive_loop(
|
||||||
@@ -1185,14 +1138,6 @@ where
|
|||||||
// Parse
|
// Parse
|
||||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||||
'P' => {
|
'P' => {
|
||||||
if query_router.query_parser_enabled() {
|
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
|
||||||
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
||||||
plugin_output = Some(output);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1221,26 +1166,13 @@ where
|
|||||||
|
|
||||||
// Sync
|
// Sync
|
||||||
// Frontend (client) is asking for the query result now.
|
// Frontend (client) is asking for the query result now.
|
||||||
'S' => {
|
'S' | 'H' => {
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
match plugin_output {
|
if code == 'H' {
|
||||||
Some(PluginOutput::Deny(error)) => {
|
server.switch_async(true);
|
||||||
error_response(&mut self.write, &error).await?;
|
debug!("Client requested flush, going async");
|
||||||
plugin_output = None;
|
}
|
||||||
self.buffer.clear();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
plugin_output = None;
|
|
||||||
self.buffer.clear();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
|
|
||||||
|
|||||||
262
src/config.rs
262
src/config.rs
@@ -12,7 +12,6 @@ use std::sync::Arc;
|
|||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
use crate::dns_cache::CachedResolver;
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
@@ -122,16 +121,6 @@ impl Default for Address {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Address {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"[address: {}:{}][database: {}][user: {}]",
|
|
||||||
self.host, self.port, self.database, self.username
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
||||||
impl PartialEq for Address {
|
impl PartialEq for Address {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
@@ -192,9 +181,7 @@ pub struct User {
|
|||||||
pub server_username: Option<String>,
|
pub server_username: Option<String>,
|
||||||
pub server_password: Option<String>,
|
pub server_password: Option<String>,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
pub min_pool_size: Option<u32>,
|
|
||||||
pub pool_mode: Option<PoolMode>,
|
pub pool_mode: Option<PoolMode>,
|
||||||
pub server_lifetime: Option<u64>,
|
|
||||||
#[serde(default)] // 0
|
#[serde(default)] // 0
|
||||||
pub statement_timeout: u64,
|
pub statement_timeout: u64,
|
||||||
}
|
}
|
||||||
@@ -207,34 +194,12 @@ impl Default for User {
|
|||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
min_pool_size: None,
|
|
||||||
statement_timeout: 0,
|
statement_timeout: 0,
|
||||||
pool_mode: None,
|
pool_mode: None,
|
||||||
server_lifetime: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
|
||||||
fn validate(&self) -> Result<(), Error> {
|
|
||||||
match self.min_pool_size {
|
|
||||||
Some(min_pool_size) => {
|
|
||||||
if min_pool_size > self.pool_size {
|
|
||||||
error!(
|
|
||||||
"min_pool_size of {} cannot be larger than pool_size of {}",
|
|
||||||
min_pool_size, self.pool_size
|
|
||||||
);
|
|
||||||
return Err(Error::BadConfig);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
None => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// General configuration.
|
/// General configuration.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct General {
|
pub struct General {
|
||||||
@@ -245,8 +210,6 @@ pub struct General {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
|
|
||||||
pub enable_prometheus_exporter: Option<bool>,
|
pub enable_prometheus_exporter: Option<bool>,
|
||||||
|
|
||||||
#[serde(default = "General::default_prometheus_exporter_port")]
|
|
||||||
pub prometheus_exporter_port: i16,
|
pub prometheus_exporter_port: i16,
|
||||||
|
|
||||||
#[serde(default = "General::default_connect_timeout")]
|
#[serde(default = "General::default_connect_timeout")]
|
||||||
@@ -268,12 +231,6 @@ pub struct General {
|
|||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
pub log_client_disconnections: bool,
|
pub log_client_disconnections: bool,
|
||||||
|
|
||||||
#[serde(default)] // False
|
|
||||||
pub dns_cache_enabled: bool,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_dns_max_ttl")]
|
|
||||||
pub dns_max_ttl: u64,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_shutdown_timeout")]
|
#[serde(default = "General::default_shutdown_timeout")]
|
||||||
pub shutdown_timeout: u64,
|
pub shutdown_timeout: u64,
|
||||||
|
|
||||||
@@ -289,9 +246,6 @@ pub struct General {
|
|||||||
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
|
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
|
||||||
pub idle_client_in_transaction_timeout: u64,
|
pub idle_client_in_transaction_timeout: u64,
|
||||||
|
|
||||||
#[serde(default = "General::default_server_lifetime")]
|
|
||||||
pub server_lifetime: u64,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_worker_threads")]
|
#[serde(default = "General::default_worker_threads")]
|
||||||
pub worker_threads: usize,
|
pub worker_threads: usize,
|
||||||
|
|
||||||
@@ -300,20 +254,9 @@ pub struct General {
|
|||||||
|
|
||||||
pub tls_certificate: Option<String>,
|
pub tls_certificate: Option<String>,
|
||||||
pub tls_private_key: Option<String>,
|
pub tls_private_key: Option<String>,
|
||||||
|
|
||||||
#[serde(default)] // false
|
|
||||||
pub server_tls: bool,
|
|
||||||
|
|
||||||
#[serde(default)] // false
|
|
||||||
pub verify_server_certificate: bool,
|
|
||||||
|
|
||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
|
||||||
pub validate_config: bool,
|
|
||||||
|
|
||||||
// Support for auth query
|
|
||||||
pub auth_query: Option<String>,
|
pub auth_query: Option<String>,
|
||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
@@ -328,10 +271,6 @@ impl General {
|
|||||||
5432
|
5432
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_server_lifetime() -> u64 {
|
|
||||||
1000 * 60 * 60 * 24 // 24 hours
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_connect_timeout() -> u64 {
|
pub fn default_connect_timeout() -> u64 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
@@ -359,10 +298,6 @@ impl General {
|
|||||||
60000
|
60000
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_dns_max_ttl() -> u64 {
|
|
||||||
30
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_healthcheck_timeout() -> u64 {
|
pub fn default_healthcheck_timeout() -> u64 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
@@ -382,14 +317,6 @@ impl General {
|
|||||||
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_validate_config() -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_prometheus_exporter_port() -> i16 {
|
|
||||||
9930
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for General {
|
impl Default for General {
|
||||||
@@ -413,19 +340,13 @@ impl Default for General {
|
|||||||
log_client_connections: false,
|
log_client_connections: false,
|
||||||
log_client_disconnections: false,
|
log_client_disconnections: false,
|
||||||
autoreload: None,
|
autoreload: None,
|
||||||
dns_cache_enabled: false,
|
|
||||||
dns_max_ttl: Self::default_dns_max_ttl(),
|
|
||||||
tls_certificate: None,
|
tls_certificate: None,
|
||||||
tls_private_key: None,
|
tls_private_key: None,
|
||||||
server_tls: false,
|
|
||||||
verify_server_certificate: false,
|
|
||||||
admin_username: String::from("admin"),
|
admin_username: String::from("admin"),
|
||||||
admin_password: String::from("admin"),
|
admin_password: String::from("admin"),
|
||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: 1000 * 3600 * 24, // 24 hours,
|
|
||||||
validate_config: true,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -478,7 +399,6 @@ pub struct Pool {
|
|||||||
#[serde(default = "Pool::default_load_balancing_mode")]
|
#[serde(default = "Pool::default_load_balancing_mode")]
|
||||||
pub load_balancing_mode: LoadBalancingMode,
|
pub load_balancing_mode: LoadBalancingMode,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_default_role")]
|
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
@@ -491,9 +411,6 @@ pub struct Pool {
|
|||||||
|
|
||||||
pub idle_timeout: Option<u64>,
|
pub idle_timeout: Option<u64>,
|
||||||
|
|
||||||
pub server_lifetime: Option<u64>,
|
|
||||||
|
|
||||||
#[serde(default = "Pool::default_sharding_function")]
|
|
||||||
pub sharding_function: ShardingFunction,
|
pub sharding_function: ShardingFunction,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_automatic_sharding_key")]
|
#[serde(default = "Pool::default_automatic_sharding_key")]
|
||||||
@@ -507,7 +424,6 @@ pub struct Pool {
|
|||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
pub shards: BTreeMap<String, Shard>,
|
pub shards: BTreeMap<String, Shard>,
|
||||||
pub users: BTreeMap<String, User>,
|
pub users: BTreeMap<String, User>,
|
||||||
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
||||||
@@ -540,14 +456,6 @@ impl Pool {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_default_role() -> String {
|
|
||||||
"any".into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_sharding_function() -> ShardingFunction {
|
|
||||||
ShardingFunction::PgBigintHash
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn validate(&mut self) -> Result<(), Error> {
|
pub fn validate(&mut self) -> Result<(), Error> {
|
||||||
match self.default_role.as_ref() {
|
match self.default_role.as_ref() {
|
||||||
"any" => (),
|
"any" => (),
|
||||||
@@ -607,10 +515,6 @@ impl Pool {
|
|||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
for (_, user) in &self.users {
|
|
||||||
user.validate()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -635,8 +539,6 @@ impl Default for Pool {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: None,
|
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -686,7 +588,7 @@ impl Shard {
|
|||||||
|
|
||||||
if primary_count > 1 {
|
if primary_count > 1 {
|
||||||
error!(
|
error!(
|
||||||
"Shard {} has more than one primary configured",
|
"Shard {} has more than on primary configured",
|
||||||
self.database
|
self.database
|
||||||
);
|
);
|
||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
@@ -715,76 +617,6 @@ impl Default for Shard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Plugins {
|
|
||||||
pub intercept: Option<Intercept>,
|
|
||||||
pub table_access: Option<TableAccess>,
|
|
||||||
pub query_logger: Option<QueryLogger>,
|
|
||||||
pub prewarmer: Option<Prewarmer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Plugins {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
|
|
||||||
self.intercept.is_some(),
|
|
||||||
self.table_access.is_some(),
|
|
||||||
self.query_logger.is_some(),
|
|
||||||
self.prewarmer.is_some(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Intercept {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub queries: BTreeMap<String, Query>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct TableAccess {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub tables: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct QueryLogger {
|
|
||||||
pub enabled: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Prewarmer {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub queries: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Intercept {
|
|
||||||
pub fn substitute(&mut self, db: &str, user: &str) {
|
|
||||||
for (_, query) in self.queries.iter_mut() {
|
|
||||||
query.substitute(db, user);
|
|
||||||
query.query = query.query.to_ascii_lowercase();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Query {
|
|
||||||
pub query: String,
|
|
||||||
pub schema: Vec<Vec<String>>,
|
|
||||||
pub result: Vec<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Query {
|
|
||||||
pub fn substitute(&mut self, db: &str, user: &str) {
|
|
||||||
for col in self.result.iter_mut() {
|
|
||||||
for i in 0..col.len() {
|
|
||||||
col[i] = col[i].replace("${USER}", user).replace("${DATABASE}", db);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Configuration wrapper.
|
/// Configuration wrapper.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -802,13 +634,7 @@ pub struct Config {
|
|||||||
#[serde(default = "Config::default_path")]
|
#[serde(default = "Config::default_path")]
|
||||||
pub path: String,
|
pub path: String,
|
||||||
|
|
||||||
// General and global settings.
|
|
||||||
pub general: General,
|
pub general: General,
|
||||||
|
|
||||||
// Plugins that should run in all pools.
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
|
|
||||||
// Connection pools.
|
|
||||||
pub pools: HashMap<String, Pool>,
|
pub pools: HashMap<String, Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -846,7 +672,6 @@ impl Default for Config {
|
|||||||
path: Self::default_path(),
|
path: Self::default_path(),
|
||||||
general: General::default(),
|
general: General::default(),
|
||||||
pools: HashMap::default(),
|
pools: HashMap::default(),
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -966,10 +791,6 @@ impl Config {
|
|||||||
);
|
);
|
||||||
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
|
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
|
||||||
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
|
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
|
||||||
info!(
|
|
||||||
"Default max server lifetime: {}ms",
|
|
||||||
self.general.server_lifetime
|
|
||||||
);
|
|
||||||
match self.general.tls_certificate.clone() {
|
match self.general.tls_certificate.clone() {
|
||||||
Some(tls_certificate) => {
|
Some(tls_certificate) => {
|
||||||
info!("TLS certificate: {}", tls_certificate);
|
info!("TLS certificate: {}", tls_certificate);
|
||||||
@@ -988,18 +809,6 @@ impl Config {
|
|||||||
info!("TLS support is disabled");
|
info!("TLS support is disabled");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("Server TLS enabled: {}", self.general.server_tls);
|
|
||||||
info!(
|
|
||||||
"Server TLS certificate verification: {}",
|
|
||||||
self.general.verify_server_certificate
|
|
||||||
);
|
|
||||||
info!(
|
|
||||||
"Plugins: {}",
|
|
||||||
match self.plugins {
|
|
||||||
Some(ref plugins) => plugins.to_string(),
|
|
||||||
None => "not configured".into(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
for (pool_name, pool_config) in &self.pools {
|
for (pool_name, pool_config) in &self.pools {
|
||||||
// TODO: Make this output prettier (maybe a table?)
|
// TODO: Make this output prettier (maybe a table?)
|
||||||
@@ -1058,34 +867,12 @@ impl Config {
|
|||||||
pool_name,
|
pool_name,
|
||||||
pool_config.users.len()
|
pool_config.users.len()
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"[pool: {}] Max server lifetime: {}",
|
|
||||||
pool_name,
|
|
||||||
match pool_config.server_lifetime {
|
|
||||||
Some(server_lifetime) => format!("{}ms", server_lifetime),
|
|
||||||
None => "default".to_string(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
info!(
|
|
||||||
"[pool: {}] Plugins: {}",
|
|
||||||
pool_name,
|
|
||||||
match pool_config.plugins {
|
|
||||||
Some(ref plugins) => plugins.to_string(),
|
|
||||||
None => "not configured".into(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
for user in &pool_config.users {
|
for user in &pool_config.users {
|
||||||
info!(
|
info!(
|
||||||
"[pool: {}][user: {}] Pool size: {}",
|
"[pool: {}][user: {}] Pool size: {}",
|
||||||
pool_name, user.1.username, user.1.pool_size,
|
pool_name, user.1.username, user.1.pool_size,
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"[pool: {}][user: {}] Minimum pool size: {}",
|
|
||||||
pool_name,
|
|
||||||
user.1.username,
|
|
||||||
user.1.min_pool_size.unwrap_or(0)
|
|
||||||
);
|
|
||||||
info!(
|
info!(
|
||||||
"[pool: {}][user: {}] Statement timeout: {}",
|
"[pool: {}][user: {}] Statement timeout: {}",
|
||||||
pool_name, user.1.username, user.1.statement_timeout
|
pool_name, user.1.username, user.1.statement_timeout
|
||||||
@@ -1099,15 +886,6 @@ impl Config {
|
|||||||
None => pool_config.pool_mode.to_string(),
|
None => pool_config.pool_mode.to_string(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"[pool: {}][user: {}] Max server lifetime: {}",
|
|
||||||
pool_name,
|
|
||||||
user.1.username,
|
|
||||||
match user.1.server_lifetime {
|
|
||||||
Some(server_lifetime) => format!("{}ms", server_lifetime),
|
|
||||||
None => "default".to_string(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1118,13 +896,7 @@ impl Config {
|
|||||||
&& (self.general.auth_query_user.is_none()
|
&& (self.general.auth_query_user.is_none()
|
||||||
|| self.general.auth_query_password.is_none())
|
|| self.general.auth_query_password.is_none())
|
||||||
{
|
{
|
||||||
error!(
|
error!("If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`");
|
||||||
"If auth_query is specified, \
|
|
||||||
you need to provide a value \
|
|
||||||
for `auth_query_user`, \
|
|
||||||
`auth_query_password`"
|
|
||||||
);
|
|
||||||
|
|
||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1132,14 +904,7 @@ impl Config {
|
|||||||
if pool.auth_query.is_some()
|
if pool.auth_query.is_some()
|
||||||
&& (pool.auth_query_user.is_none() || pool.auth_query_password.is_none())
|
&& (pool.auth_query_user.is_none() || pool.auth_query_password.is_none())
|
||||||
{
|
{
|
||||||
error!(
|
error!("Error in pool {{ {} }}. If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`", name);
|
||||||
"Error in pool {{ {} }}. \
|
|
||||||
If auth_query is specified, you need \
|
|
||||||
to provide a value for `auth_query_user`, \
|
|
||||||
`auth_query_password`",
|
|
||||||
name
|
|
||||||
);
|
|
||||||
|
|
||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1149,13 +914,7 @@ impl Config {
|
|||||||
|| pool.auth_query_user.is_none())
|
|| pool.auth_query_user.is_none())
|
||||||
&& user_data.password.is_none()
|
&& user_data.password.is_none()
|
||||||
{
|
{
|
||||||
error!(
|
error!("Error in pool {{ {} }}. You have to specify a user password for every pool if auth_query is not specified", name);
|
||||||
"Error in pool {{ {} }}. \
|
|
||||||
You have to specify a user password \
|
|
||||||
for every pool if auth_query is not specified",
|
|
||||||
name
|
|
||||||
);
|
|
||||||
|
|
||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1253,7 +1012,6 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
|
|
||||||
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
||||||
let old_config = get_config();
|
let old_config = get_config();
|
||||||
|
|
||||||
match parse(&old_config.path).await {
|
match parse(&old_config.path).await {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -1261,18 +1019,14 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
|
|||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_config = get_config();
|
let new_config = get_config();
|
||||||
|
|
||||||
match CachedResolver::from_config().await {
|
if old_config.pools != new_config.pools {
|
||||||
Ok(_) => (),
|
info!("Pool configuration changed");
|
||||||
Err(err) => error!("DNS cache reinitialization error: {:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
if old_config != new_config {
|
|
||||||
info!("Config changed, reloading");
|
|
||||||
ConnectionPool::from_config(client_server_map).await?;
|
ConnectionPool::from_config(client_server_map).await?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
|
} else if old_config != new_config {
|
||||||
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|||||||
410
src/dns_cache.rs
410
src/dns_cache.rs
@@ -1,410 +0,0 @@
|
|||||||
use crate::config::get_config;
|
|
||||||
use crate::errors::Error;
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use log::{debug, error, info, warn};
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
use std::io;
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::RwLock;
|
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
use trust_dns_resolver::error::{ResolveError, ResolveResult};
|
|
||||||
use trust_dns_resolver::lookup_ip::LookupIp;
|
|
||||||
use trust_dns_resolver::TokioAsyncResolver;
|
|
||||||
|
|
||||||
/// Cached Resolver Globally available
|
|
||||||
pub static CACHED_RESOLVER: Lazy<ArcSwap<CachedResolver>> =
|
|
||||||
Lazy::new(|| ArcSwap::from_pointee(CachedResolver::default()));
|
|
||||||
|
|
||||||
// Ip addressed are returned as a set of addresses
|
|
||||||
// so we can compare.
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
|
||||||
pub struct AddrSet {
|
|
||||||
set: HashSet<IpAddr>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AddrSet {
|
|
||||||
fn new() -> AddrSet {
|
|
||||||
AddrSet {
|
|
||||||
set: HashSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<LookupIp> for AddrSet {
|
|
||||||
fn from(lookup_ip: LookupIp) -> Self {
|
|
||||||
let mut addr_set = AddrSet::new();
|
|
||||||
for address in lookup_ip.iter() {
|
|
||||||
addr_set.set.insert(address);
|
|
||||||
}
|
|
||||||
addr_set
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// A CachedResolver is a DNS resolution cache mechanism with customizable expiration time.
|
|
||||||
///
|
|
||||||
/// The system works as follows:
|
|
||||||
///
|
|
||||||
/// When a host is to be resolved, if we have not resolved it before, a new resolution is
|
|
||||||
/// executed and stored in the internal cache. Concurrently, every `dns_max_ttl` time, the
|
|
||||||
/// cache is refreshed.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// let addrset = resolver.lookup_ip("www.example.com.").await.unwrap();
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// // Now the ip resolution is stored in local cache and subsequent
|
|
||||||
/// // calls will be returned from cache. Also, the cache is refreshed
|
|
||||||
/// // and updated every 10 seconds.
|
|
||||||
///
|
|
||||||
/// // You can now check if an 'old' lookup differs from what it's currently
|
|
||||||
/// // store in cache by using `has_changed`.
|
|
||||||
/// resolver.has_changed("www.example.com.", addrset)
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct CachedResolver {
|
|
||||||
// The configuration of the cached_resolver.
|
|
||||||
config: CachedResolverConfig,
|
|
||||||
|
|
||||||
// This is the hash that contains the hash.
|
|
||||||
data: Option<RwLock<HashMap<String, AddrSet>>>,
|
|
||||||
|
|
||||||
// The resolver to be used for DNS queries.
|
|
||||||
resolver: Option<TokioAsyncResolver>,
|
|
||||||
|
|
||||||
// The RefreshLoop
|
|
||||||
refresh_loop: RwLock<Option<tokio::task::JoinHandle<()>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// Configuration
|
|
||||||
#[derive(Clone, Debug, Default, PartialEq)]
|
|
||||||
pub struct CachedResolverConfig {
|
|
||||||
/// Amount of time in secods that a resolved dns address is considered stale.
|
|
||||||
dns_max_ttl: u64,
|
|
||||||
|
|
||||||
/// Enabled or disabled? (this is so we can reload config)
|
|
||||||
enabled: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedResolverConfig {
|
|
||||||
fn new(dns_max_ttl: u64, enabled: bool) -> Self {
|
|
||||||
CachedResolverConfig {
|
|
||||||
dns_max_ttl,
|
|
||||||
enabled,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<crate::config::Config> for CachedResolverConfig {
|
|
||||||
fn from(config: crate::config::Config) -> Self {
|
|
||||||
CachedResolverConfig::new(config.general.dns_max_ttl, config.general.dns_cache_enabled)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedResolver {
|
|
||||||
///
|
|
||||||
/// Returns a new Arc<CachedResolver> based on passed configuration.
|
|
||||||
/// It also starts the loop that will refresh cache entries.
|
|
||||||
///
|
|
||||||
/// # Arguments:
|
|
||||||
///
|
|
||||||
/// * `config` - The `CachedResolverConfig` to be used to create the resolver.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
pub async fn new(
|
|
||||||
config: CachedResolverConfig,
|
|
||||||
data: Option<HashMap<String, AddrSet>>,
|
|
||||||
) -> Result<Arc<Self>, io::Error> {
|
|
||||||
// Construct a new Resolver with default configuration options
|
|
||||||
let resolver = Some(TokioAsyncResolver::tokio_from_system_conf()?);
|
|
||||||
|
|
||||||
let data = if let Some(hash) = data {
|
|
||||||
Some(RwLock::new(hash))
|
|
||||||
} else {
|
|
||||||
Some(RwLock::new(HashMap::new()))
|
|
||||||
};
|
|
||||||
|
|
||||||
let instance = Arc::new(Self {
|
|
||||||
config,
|
|
||||||
resolver,
|
|
||||||
data,
|
|
||||||
refresh_loop: RwLock::new(None),
|
|
||||||
});
|
|
||||||
|
|
||||||
if instance.enabled() {
|
|
||||||
info!("Scheduling DNS refresh loop");
|
|
||||||
let refresh_loop = tokio::task::spawn({
|
|
||||||
let instance = instance.clone();
|
|
||||||
async move {
|
|
||||||
instance.refresh_dns_entries_loop().await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
*(instance.refresh_loop.write().unwrap()) = Some(refresh_loop);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(instance)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn enabled(&self) -> bool {
|
|
||||||
self.config.enabled
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedules the refresher
|
|
||||||
async fn refresh_dns_entries_loop(&self) {
|
|
||||||
let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap();
|
|
||||||
let interval = Duration::from_secs(self.config.dns_max_ttl);
|
|
||||||
loop {
|
|
||||||
debug!("Begin refreshing cached DNS addresses.");
|
|
||||||
// To minimize the time we hold the lock, we first create
|
|
||||||
// an array with keys.
|
|
||||||
let mut hostnames: Vec<String> = Vec::new();
|
|
||||||
{
|
|
||||||
if let Some(ref data) = self.data {
|
|
||||||
for hostname in data.read().unwrap().keys() {
|
|
||||||
hostnames.push(hostname.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for hostname in hostnames.iter() {
|
|
||||||
let addrset = self
|
|
||||||
.fetch_from_cache(hostname.as_str())
|
|
||||||
.expect("Could not obtain expected address from cache, this should not happen");
|
|
||||||
|
|
||||||
match resolver.lookup_ip(hostname).await {
|
|
||||||
Ok(lookup_ip) => {
|
|
||||||
let new_addrset = AddrSet::from(lookup_ip);
|
|
||||||
debug!(
|
|
||||||
"Obtained address for host ({}) -> ({:?})",
|
|
||||||
hostname, new_addrset
|
|
||||||
);
|
|
||||||
|
|
||||||
if addrset != new_addrset {
|
|
||||||
debug!(
|
|
||||||
"Addr changed from {:?} to {:?} updating cache.",
|
|
||||||
addrset, new_addrset
|
|
||||||
);
|
|
||||||
self.store_in_cache(hostname, new_addrset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
"There was an error trying to resolv {}: ({}).",
|
|
||||||
hostname, err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug!("Finished refreshing cached DNS addresses.");
|
|
||||||
sleep(interval).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a `AddrSet` given the specified hostname.
|
|
||||||
///
|
|
||||||
/// This method first tries to fetch the value from the cache, if it misses
|
|
||||||
/// then it is resolved and stored in the cache. TTL from records is ignored.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `host` - A string slice referencing the hostname to be resolved.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// let response = resolver.lookup_ip("www.google.com.");
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
pub async fn lookup_ip(&self, host: &str) -> ResolveResult<AddrSet> {
|
|
||||||
debug!("Lookup up {} in cache", host);
|
|
||||||
match self.fetch_from_cache(host) {
|
|
||||||
Some(addr_set) => {
|
|
||||||
debug!("Cache hit!");
|
|
||||||
Ok(addr_set)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
debug!("Not found, executing a dns query!");
|
|
||||||
if let Some(ref resolver) = self.resolver {
|
|
||||||
let addr_set = AddrSet::from(resolver.lookup_ip(host).await?);
|
|
||||||
debug!("Obtained: {:?}", addr_set);
|
|
||||||
self.store_in_cache(host, addr_set.clone());
|
|
||||||
Ok(addr_set)
|
|
||||||
} else {
|
|
||||||
Err(ResolveError::from("No resolver available"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Returns true if the stored host resolution differs from the AddrSet passed.
|
|
||||||
pub fn has_changed(&self, host: &str, addr_set: &AddrSet) -> bool {
|
|
||||||
if let Some(fetched_addr_set) = self.fetch_from_cache(host) {
|
|
||||||
return fetched_addr_set != *addr_set;
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetches an AddrSet from the inner cache adquiring the read lock.
|
|
||||||
fn fetch_from_cache(&self, key: &str) -> Option<AddrSet> {
|
|
||||||
if let Some(ref hash) = self.data {
|
|
||||||
if let Some(addr_set) = hash.read().unwrap().get(key) {
|
|
||||||
return Some(addr_set.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets up the global CACHED_RESOLVER static variable so we can globally use DNS
|
|
||||||
// cache.
|
|
||||||
pub async fn from_config() -> Result<(), Error> {
|
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
let desired_config = CachedResolverConfig::from(get_config());
|
|
||||||
|
|
||||||
if cached_resolver.config != desired_config {
|
|
||||||
if let Some(ref refresh_loop) = *(cached_resolver.refresh_loop.write().unwrap()) {
|
|
||||||
warn!("Killing Dnscache refresh loop as its configuration is being reloaded");
|
|
||||||
refresh_loop.abort()
|
|
||||||
}
|
|
||||||
let new_resolver = if let Some(ref data) = cached_resolver.data {
|
|
||||||
let data = Some(data.read().unwrap().clone());
|
|
||||||
CachedResolver::new(desired_config, data).await
|
|
||||||
} else {
|
|
||||||
CachedResolver::new(desired_config, None).await
|
|
||||||
};
|
|
||||||
|
|
||||||
match new_resolver {
|
|
||||||
Ok(ok) => {
|
|
||||||
CACHED_RESOLVER.store(ok);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let message = format!("Error setting up cached_resolver. Error: {:?}, will continue without this feature.", err);
|
|
||||||
Err(Error::DNSCachedError(message))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stores the AddrSet in cache adquiring the write lock.
|
|
||||||
fn store_in_cache(&self, host: &str, addr_set: AddrSet) {
|
|
||||||
if let Some(ref data) = self.data {
|
|
||||||
data.write().unwrap().insert(host.to_string(), addr_set);
|
|
||||||
} else {
|
|
||||||
error!("Could not insert, Hash not initialized");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use trust_dns_resolver::error::ResolveError;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn new() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await;
|
|
||||||
assert!(resolver.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn lookup_ip() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let response = resolver.lookup_ip("www.google.com.").await;
|
|
||||||
assert!(response.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn has_changed() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.google.com.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
let addr_set = response.unwrap();
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn unknown_host() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.idontexists.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
assert!(matches!(response, Err(ResolveError { .. })));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn incorrect_address() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "w ww.idontexists.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
assert!(matches!(response, Err(ResolveError { .. })));
|
|
||||||
assert!(!resolver.has_changed(hostname, &AddrSet::new()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
// Ok, this test is based on the fact that google does DNS RR
|
|
||||||
// and does not responds with every available ip everytime, so
|
|
||||||
// if I cache here, it will miss after one cache iteration or two.
|
|
||||||
async fn thread() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.google.com.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
let addr_set = response.unwrap();
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
let resolver_for_refresher = resolver.clone();
|
|
||||||
let _thread_handle = tokio::task::spawn(async move {
|
|
||||||
resolver_for_refresher.refresh_dns_entries_loop().await;
|
|
||||||
});
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
//! Errors.
|
//! Errors.
|
||||||
|
|
||||||
/// Various errors.
|
/// Various errors.
|
||||||
#[derive(Debug, PartialEq, Clone)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
SocketError(String),
|
SocketError(String),
|
||||||
ClientSocketError(String, ClientIdentifier),
|
ClientSocketError(String, ClientIdentifier),
|
||||||
@@ -19,13 +19,10 @@ pub enum Error {
|
|||||||
ClientError(String),
|
ClientError(String),
|
||||||
TlsError,
|
TlsError,
|
||||||
StatementTimeout,
|
StatementTimeout,
|
||||||
DNSCachedError(String),
|
|
||||||
ShuttingDown,
|
ShuttingDown,
|
||||||
ParseBytesError(String),
|
ParseBytesError(String),
|
||||||
AuthError(String),
|
AuthError(String),
|
||||||
AuthPassthroughError(String),
|
AuthPassthroughError(String),
|
||||||
UnsupportedStatement,
|
|
||||||
QueryRouterParserError(String),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
|||||||
@@ -1,17 +1,11 @@
|
|||||||
pub mod admin;
|
|
||||||
pub mod auth_passthrough;
|
pub mod auth_passthrough;
|
||||||
pub mod client;
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod constants;
|
pub mod constants;
|
||||||
pub mod dns_cache;
|
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
pub mod messages;
|
pub mod messages;
|
||||||
pub mod mirrors;
|
pub mod mirrors;
|
||||||
pub mod multi_logger;
|
pub mod multi_logger;
|
||||||
pub mod plugins;
|
|
||||||
pub mod pool;
|
pub mod pool;
|
||||||
pub mod prometheus;
|
|
||||||
pub mod query_router;
|
|
||||||
pub mod scram;
|
pub mod scram;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod sharding;
|
pub mod sharding;
|
||||||
|
|||||||
46
src/main.rs
46
src/main.rs
@@ -36,7 +36,6 @@ extern crate sqlparser;
|
|||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
extern crate tokio_rustls;
|
extern crate tokio_rustls;
|
||||||
extern crate toml;
|
extern crate toml;
|
||||||
extern crate trust_dns_resolver;
|
|
||||||
|
|
||||||
#[cfg(not(target_env = "msvc"))]
|
#[cfg(not(target_env = "msvc"))]
|
||||||
use jemallocator::Jemalloc;
|
use jemallocator::Jemalloc;
|
||||||
@@ -61,19 +60,36 @@ use std::str::FromStr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use pgcat::config::{get_config, reload_config, VERSION};
|
mod admin;
|
||||||
use pgcat::dns_cache;
|
mod auth_passthrough;
|
||||||
use pgcat::messages::configure_socket;
|
mod client;
|
||||||
use pgcat::pool::{ClientServerMap, ConnectionPool};
|
mod config;
|
||||||
use pgcat::prometheus::start_metric_server;
|
mod constants;
|
||||||
use pgcat::stats::{Collector, Reporter, REPORTER};
|
mod errors;
|
||||||
|
mod messages;
|
||||||
|
mod mirrors;
|
||||||
|
mod multi_logger;
|
||||||
|
mod pool;
|
||||||
|
mod prometheus;
|
||||||
|
mod query_router;
|
||||||
|
mod scram;
|
||||||
|
mod server;
|
||||||
|
mod sharding;
|
||||||
|
mod stats;
|
||||||
|
mod tls;
|
||||||
|
|
||||||
|
use crate::config::{get_config, reload_config, VERSION};
|
||||||
|
use crate::messages::configure_socket;
|
||||||
|
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||||
|
use crate::prometheus::start_metric_server;
|
||||||
|
use crate::stats::{Collector, Reporter, REPORTER};
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
pgcat::multi_logger::MultiLogger::init().unwrap();
|
multi_logger::MultiLogger::init().unwrap();
|
||||||
|
|
||||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||||
|
|
||||||
if !pgcat::query_router::QueryRouter::setup() {
|
if !query_router::QueryRouter::setup() {
|
||||||
error!("Could not setup query router");
|
error!("Could not setup query router");
|
||||||
std::process::exit(exitcode::CONFIG);
|
std::process::exit(exitcode::CONFIG);
|
||||||
}
|
}
|
||||||
@@ -91,7 +107,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
|
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
|
||||||
|
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
match pgcat::config::parse(&config_file).await {
|
match config::parse(&config_file).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Config parse error: {:?}", err);
|
error!("Config parse error: {:?}", err);
|
||||||
@@ -150,12 +166,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
// Statistics reporting.
|
// Statistics reporting.
|
||||||
REPORTER.store(Arc::new(Reporter::default()));
|
REPORTER.store(Arc::new(Reporter::default()));
|
||||||
|
|
||||||
// Starts (if enabled) dns cache before pools initialization
|
|
||||||
match dns_cache::CachedResolver::from_config().await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => error!("DNS cache initialization error: {:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Connection pool that allows to query all shards and replicas.
|
// Connection pool that allows to query all shards and replicas.
|
||||||
match ConnectionPool::from_config(client_server_map.clone()).await {
|
match ConnectionPool::from_config(client_server_map.clone()).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
@@ -285,7 +295,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let start = chrono::offset::Utc::now().naive_utc();
|
let start = chrono::offset::Utc::now().naive_utc();
|
||||||
|
|
||||||
match pgcat::client::client_entrypoint(
|
match client::client_entrypoint(
|
||||||
socket,
|
socket,
|
||||||
client_server_map,
|
client_server_map,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
@@ -316,7 +326,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
match err {
|
match err {
|
||||||
pgcat::errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
|
errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
|
||||||
_ => warn!("Client disconnected with error {:?}", err),
|
_ => warn!("Client disconnected with error {:?}", err),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,10 +20,6 @@ pub enum DataType {
|
|||||||
Text,
|
Text,
|
||||||
Int4,
|
Int4,
|
||||||
Numeric,
|
Numeric,
|
||||||
Bool,
|
|
||||||
Oid,
|
|
||||||
AnyArray,
|
|
||||||
Any,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&DataType> for i32 {
|
impl From<&DataType> for i32 {
|
||||||
@@ -32,10 +28,6 @@ impl From<&DataType> for i32 {
|
|||||||
DataType::Text => 25,
|
DataType::Text => 25,
|
||||||
DataType::Int4 => 23,
|
DataType::Int4 => 23,
|
||||||
DataType::Numeric => 1700,
|
DataType::Numeric => 1700,
|
||||||
DataType::Bool => 16,
|
|
||||||
DataType::Oid => 26,
|
|
||||||
DataType::AnyArray => 2277,
|
|
||||||
DataType::Any => 2276,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -124,10 +116,7 @@ where
|
|||||||
|
|
||||||
/// Send the startup packet the server. We're pretending we're a Pg client.
|
/// Send the startup packet the server. We're pretending we're a Pg client.
|
||||||
/// This tells the server which user we are and what database we want.
|
/// This tells the server which user we are and what database we want.
|
||||||
pub async fn startup<S>(stream: &mut S, user: &str, database: &str) -> Result<(), Error>
|
pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Result<(), Error> {
|
||||||
where
|
|
||||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
||||||
{
|
|
||||||
let mut bytes = BytesMut::with_capacity(25);
|
let mut bytes = BytesMut::with_capacity(25);
|
||||||
|
|
||||||
bytes.put_i32(196608); // Protocol number
|
bytes.put_i32(196608); // Protocol number
|
||||||
@@ -161,21 +150,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ssl_request(stream: &mut TcpStream) -> Result<(), Error> {
|
|
||||||
let mut bytes = BytesMut::with_capacity(12);
|
|
||||||
|
|
||||||
bytes.put_i32(8);
|
|
||||||
bytes.put_i32(80877103);
|
|
||||||
|
|
||||||
match stream.write_all(&bytes).await {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(err) => Err(Error::SocketError(format!(
|
|
||||||
"Error writing SSLRequest to server socket - Error: {:?}",
|
|
||||||
err
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse the params the server sends as a key/value format.
|
/// Parse the params the server sends as a key/value format.
|
||||||
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
||||||
let mut result = HashMap::new();
|
let mut result = HashMap::new();
|
||||||
@@ -451,10 +425,6 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
|||||||
DataType::Text => -1,
|
DataType::Text => -1,
|
||||||
DataType::Int4 => 4,
|
DataType::Int4 => 4,
|
||||||
DataType::Numeric => -1,
|
DataType::Numeric => -1,
|
||||||
DataType::Bool => 1,
|
|
||||||
DataType::Oid => 4,
|
|
||||||
DataType::AnyArray => -1,
|
|
||||||
DataType::Any => -1,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
row_desc.put_i16(type_size);
|
row_desc.put_i16(type_size);
|
||||||
@@ -493,29 +463,6 @@ pub fn data_row(row: &Vec<String>) -> BytesMut {
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
|
|
||||||
let mut res = BytesMut::new();
|
|
||||||
let mut data_row = BytesMut::new();
|
|
||||||
|
|
||||||
data_row.put_i16(row.len() as i16);
|
|
||||||
|
|
||||||
for column in row {
|
|
||||||
if let Some(column) = column {
|
|
||||||
let column = column.as_bytes();
|
|
||||||
data_row.put_i32(column.len() as i32);
|
|
||||||
data_row.put_slice(column);
|
|
||||||
} else {
|
|
||||||
data_row.put_i32(-1 as i32);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.put_u8(b'D');
|
|
||||||
res.put_i32(data_row.len() as i32 + 4);
|
|
||||||
res.put(data_row);
|
|
||||||
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a CommandComplete message.
|
/// Create a CommandComplete message.
|
||||||
pub fn command_complete(command: &str) -> BytesMut {
|
pub fn command_complete(command: &str) -> BytesMut {
|
||||||
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
||||||
@@ -558,29 +505,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_all_flush<S>(stream: &mut S, buf: &[u8]) -> Result<(), Error>
|
|
||||||
where
|
|
||||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
||||||
{
|
|
||||||
match stream.write_all(buf).await {
|
|
||||||
Ok(_) => match stream.flush().await {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::SocketError(format!(
|
|
||||||
"Error flushing socket - Error: {:?}",
|
|
||||||
err
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::SocketError(format!(
|
|
||||||
"Error writing to socket - Error: {:?}",
|
|
||||||
err
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a complete message from the socket.
|
/// Read a complete message from the socket.
|
||||||
pub async fn read_message<S>(stream: &mut S) -> Result<BytesMut, Error>
|
pub async fn read_message<S>(stream: &mut S) -> Result<BytesMut, Error>
|
||||||
where
|
where
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ impl MirroredClient {
|
|||||||
ClientServerMap::default(),
|
ClientServerMap::default(),
|
||||||
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
||||||
Arc::new(RwLock::new(None)),
|
Arc::new(RwLock::new(None)),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
Pool::builder()
|
Pool::builder()
|
||||||
|
|||||||
@@ -1,120 +0,0 @@
|
|||||||
//! The intercept plugin.
|
|
||||||
//!
|
|
||||||
//! It intercepts queries and returns fake results.
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::{BufMut, BytesMut};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
config::Intercept as InterceptConfig,
|
|
||||||
errors::Error,
|
|
||||||
messages::{command_complete, data_row_nullable, row_description, DataType},
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: use these structs for deserialization
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct Rule {
|
|
||||||
query: String,
|
|
||||||
schema: Vec<Column>,
|
|
||||||
result: Vec<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct Column {
|
|
||||||
name: String,
|
|
||||||
data_type: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The intercept plugin.
|
|
||||||
pub struct Intercept<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub config: &'a InterceptConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for Intercept<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled || ast.is_empty() {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut config = self.config.clone();
|
|
||||||
config.substitute(
|
|
||||||
&query_router.pool_settings().db,
|
|
||||||
&query_router.pool_settings().user.username,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut result = BytesMut::new();
|
|
||||||
|
|
||||||
for q in ast {
|
|
||||||
// Normalization
|
|
||||||
let q = q.to_string().to_ascii_lowercase();
|
|
||||||
|
|
||||||
for (_, target) in config.queries.iter() {
|
|
||||||
if target.query.as_str() == q {
|
|
||||||
debug!("Intercepting query: {}", q);
|
|
||||||
|
|
||||||
let rd = target
|
|
||||||
.schema
|
|
||||||
.iter()
|
|
||||||
.map(|row| {
|
|
||||||
let name = &row[0];
|
|
||||||
let data_type = &row[1];
|
|
||||||
(
|
|
||||||
name.as_str(),
|
|
||||||
match data_type.as_str() {
|
|
||||||
"text" => DataType::Text,
|
|
||||||
"anyarray" => DataType::AnyArray,
|
|
||||||
"oid" => DataType::Oid,
|
|
||||||
"bool" => DataType::Bool,
|
|
||||||
"int4" => DataType::Int4,
|
|
||||||
_ => DataType::Any,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<(&str, DataType)>>();
|
|
||||||
|
|
||||||
result.put(row_description(&rd));
|
|
||||||
|
|
||||||
target.result.iter().for_each(|row| {
|
|
||||||
let row = row
|
|
||||||
.iter()
|
|
||||||
.map(|s| {
|
|
||||||
let s = s.as_str().to_string();
|
|
||||||
|
|
||||||
if s == "" {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(s)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<Option<String>>>();
|
|
||||||
result.put(data_row_nullable(&row));
|
|
||||||
});
|
|
||||||
|
|
||||||
result.put(command_complete("SELECT"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !result.is_empty() {
|
|
||||||
result.put_u8(b'Z');
|
|
||||||
result.put_i32(5);
|
|
||||||
result.put_u8(b'I');
|
|
||||||
|
|
||||||
return Ok(PluginOutput::Intercept(result));
|
|
||||||
} else {
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
//! The plugin ecosystem.
|
|
||||||
//!
|
|
||||||
//! Currently plugins only grant access or deny access to the database for a particual query.
|
|
||||||
//! Example use cases:
|
|
||||||
//! - block known bad queries
|
|
||||||
//! - block access to system catalogs
|
|
||||||
//! - block dangerous modifications like `DROP TABLE`
|
|
||||||
//! - etc
|
|
||||||
//!
|
|
||||||
|
|
||||||
pub mod intercept;
|
|
||||||
pub mod prewarmer;
|
|
||||||
pub mod query_logger;
|
|
||||||
pub mod table_access;
|
|
||||||
|
|
||||||
use crate::{errors::Error, query_router::QueryRouter};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::BytesMut;
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
pub use intercept::Intercept;
|
|
||||||
pub use query_logger::QueryLogger;
|
|
||||||
pub use table_access::TableAccess;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub enum PluginOutput {
|
|
||||||
Allow,
|
|
||||||
Deny(String),
|
|
||||||
Overwrite(Vec<Statement>),
|
|
||||||
Intercept(BytesMut),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Plugin {
|
|
||||||
// Run before the query is sent to the server.
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error>;
|
|
||||||
|
|
||||||
// TODO: run after the result is returned
|
|
||||||
// async fn callback(&mut self, query_router: &QueryRouter);
|
|
||||||
}
|
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
//! Prewarm new connections before giving them to the client.
|
|
||||||
use crate::{errors::Error, server::Server};
|
|
||||||
use log::info;
|
|
||||||
|
|
||||||
pub struct Prewarmer<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub server: &'a mut Server,
|
|
||||||
pub queries: &'a Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Prewarmer<'a> {
|
|
||||||
pub async fn run(&mut self) -> Result<(), Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
for query in self.queries {
|
|
||||||
info!(
|
|
||||||
"{} Prewarning with query: `{}`",
|
|
||||||
self.server.address(),
|
|
||||||
query
|
|
||||||
);
|
|
||||||
self.server.query(&query).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
//! Log all queries to stdout (or somewhere else, why not).
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
errors::Error,
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use log::info;
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
pub struct QueryLogger<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub user: &'a str,
|
|
||||||
pub db: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for QueryLogger<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
_query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let query = ast
|
|
||||||
.iter()
|
|
||||||
.map(|q| q.to_string())
|
|
||||||
.collect::<Vec<String>>()
|
|
||||||
.join("; ");
|
|
||||||
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
|
|
||||||
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,59 +0,0 @@
|
|||||||
//! This query router plugin will check if the user can access a particular
|
|
||||||
//! table as part of their query. If they can't, the query will not be routed.
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use sqlparser::ast::{visit_relations, Statement};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
errors::Error,
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
|
|
||||||
use core::ops::ControlFlow;
|
|
||||||
|
|
||||||
pub struct TableAccess<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub tables: &'a Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for TableAccess<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
_query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut found = None;
|
|
||||||
|
|
||||||
visit_relations(ast, |relation| {
|
|
||||||
let relation = relation.to_string();
|
|
||||||
let parts = relation.split(".").collect::<Vec<&str>>();
|
|
||||||
let table_name = parts.last().unwrap();
|
|
||||||
|
|
||||||
if self.tables.contains(&table_name.to_string()) {
|
|
||||||
found = Some(table_name.to_string());
|
|
||||||
ControlFlow::<()>::Break(())
|
|
||||||
} else {
|
|
||||||
ControlFlow::<()>::Continue(())
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(found) = found {
|
|
||||||
debug!("Blocking access to table \"{}\"", found);
|
|
||||||
|
|
||||||
Ok(PluginOutput::Deny(format!(
|
|
||||||
"permission for table \"{}\" denied",
|
|
||||||
found
|
|
||||||
)))
|
|
||||||
} else {
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
145
src/pool.rs
145
src/pool.rs
@@ -17,13 +17,10 @@ use std::sync::{
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::config::{
|
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
|
||||||
get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
|
|
||||||
};
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
|
|
||||||
use crate::auth_passthrough::AuthPassthrough;
|
use crate::auth_passthrough::AuthPassthrough;
|
||||||
use crate::plugins::prewarmer;
|
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
||||||
@@ -64,8 +61,6 @@ pub struct PoolIdentifier {
|
|||||||
pub user: String,
|
pub user: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
static POOL_REAPER_RATE: u64 = 30_000; // 30 seconds by default
|
|
||||||
|
|
||||||
impl PoolIdentifier {
|
impl PoolIdentifier {
|
||||||
/// Create a new user/pool identifier.
|
/// Create a new user/pool identifier.
|
||||||
pub fn new(db: &str, user: &str) -> PoolIdentifier {
|
pub fn new(db: &str, user: &str) -> PoolIdentifier {
|
||||||
@@ -96,7 +91,6 @@ pub struct PoolSettings {
|
|||||||
|
|
||||||
// Connecting user.
|
// Connecting user.
|
||||||
pub user: User,
|
pub user: User,
|
||||||
pub db: String,
|
|
||||||
|
|
||||||
// Default server role to connect to.
|
// Default server role to connect to.
|
||||||
pub default_role: Option<Role>,
|
pub default_role: Option<Role>,
|
||||||
@@ -135,9 +129,6 @@ pub struct PoolSettings {
|
|||||||
pub auth_query: Option<String>,
|
pub auth_query: Option<String>,
|
||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
/// Plugins
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PoolSettings {
|
impl Default for PoolSettings {
|
||||||
@@ -147,7 +138,6 @@ impl Default for PoolSettings {
|
|||||||
load_balancing_mode: LoadBalancingMode::Random,
|
load_balancing_mode: LoadBalancingMode::Random,
|
||||||
shards: 1,
|
shards: 1,
|
||||||
user: User::default(),
|
user: User::default(),
|
||||||
db: String::default(),
|
|
||||||
default_role: None,
|
default_role: None,
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
primary_reads_enabled: true,
|
primary_reads_enabled: true,
|
||||||
@@ -162,7 +152,6 @@ impl Default for PoolSettings {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -202,7 +191,6 @@ pub struct ConnectionPool {
|
|||||||
paused: Arc<AtomicBool>,
|
paused: Arc<AtomicBool>,
|
||||||
paused_waiter: Arc<Notify>,
|
paused_waiter: Arc<Notify>,
|
||||||
|
|
||||||
/// Statistics.
|
|
||||||
pub stats: Arc<PoolStats>,
|
pub stats: Arc<PoolStats>,
|
||||||
|
|
||||||
/// AuthInfo
|
/// AuthInfo
|
||||||
@@ -323,34 +311,21 @@ impl ConnectionPool {
|
|||||||
|
|
||||||
if let Some(apt) = &auth_passthrough {
|
if let Some(apt) = &auth_passthrough {
|
||||||
match apt.fetch_hash(&address).await {
|
match apt.fetch_hash(&address).await {
|
||||||
Ok(ok) => {
|
Ok(ok) => {
|
||||||
if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read())
|
if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read()) {
|
||||||
{
|
if ok != *pool_auth_hash_value {
|
||||||
if ok != *pool_auth_hash_value {
|
warn!("Hash is not the same across shards of the same pool, client auth will \
|
||||||
warn!(
|
be done using last obtained hash. Server: {}:{}, Database: {}", server.host, server.port, shard.database);
|
||||||
"Hash is not the same across shards \
|
}
|
||||||
of the same pool, client auth will \
|
}
|
||||||
be done using last obtained hash. \
|
debug!("Hash obtained for {:?}", address);
|
||||||
Server: {}:{}, Database: {}",
|
{
|
||||||
server.host, server.port, shard.database,
|
let mut pool_auth_hash = pool_auth_hash.write();
|
||||||
);
|
*pool_auth_hash = Some(ok.clone());
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
Err(err) => warn!("Could not obtain password hashes using auth_query config, ignoring. Error: {:?}", err),
|
||||||
debug!("Hash obtained for {:?}", address);
|
}
|
||||||
|
|
||||||
{
|
|
||||||
let mut pool_auth_hash = pool_auth_hash.write();
|
|
||||||
*pool_auth_hash = Some(ok.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => warn!(
|
|
||||||
"Could not obtain password hashes \
|
|
||||||
using auth_query config, ignoring. \
|
|
||||||
Error: {:?}",
|
|
||||||
err,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let manager = ServerPool::new(
|
let manager = ServerPool::new(
|
||||||
@@ -360,10 +335,6 @@ impl ConnectionPool {
|
|||||||
client_server_map.clone(),
|
client_server_map.clone(),
|
||||||
pool_stats.clone(),
|
pool_stats.clone(),
|
||||||
pool_auth_hash.clone(),
|
pool_auth_hash.clone(),
|
||||||
match pool_config.plugins {
|
|
||||||
Some(ref plugins) => Some(plugins.clone()),
|
|
||||||
None => config.plugins.clone(),
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let connect_timeout = match pool_config.connect_timeout {
|
let connect_timeout = match pool_config.connect_timeout {
|
||||||
@@ -376,38 +347,14 @@ impl ConnectionPool {
|
|||||||
None => config.general.idle_timeout,
|
None => config.general.idle_timeout,
|
||||||
};
|
};
|
||||||
|
|
||||||
let server_lifetime = match user.server_lifetime {
|
|
||||||
Some(server_lifetime) => server_lifetime,
|
|
||||||
None => match pool_config.server_lifetime {
|
|
||||||
Some(server_lifetime) => server_lifetime,
|
|
||||||
None => config.general.server_lifetime,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let reaper_rate = *vec![idle_timeout, server_lifetime, POOL_REAPER_RATE]
|
|
||||||
.iter()
|
|
||||||
.min()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"[pool: {}][user: {}] Pool reaper rate: {}ms",
|
|
||||||
pool_name, user.username, reaper_rate
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = Pool::builder()
|
let pool = Pool::builder()
|
||||||
.max_size(user.pool_size)
|
.max_size(user.pool_size)
|
||||||
.min_idle(user.min_pool_size)
|
|
||||||
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
|
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
|
||||||
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
||||||
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
|
.test_on_check_out(false)
|
||||||
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
|
.build(manager)
|
||||||
.test_on_check_out(false);
|
.await
|
||||||
|
.unwrap();
|
||||||
let pool = if config.general.validate_config {
|
|
||||||
pool.build(manager).await?
|
|
||||||
} else {
|
|
||||||
pool.build_unchecked(manager)
|
|
||||||
};
|
|
||||||
|
|
||||||
pools.push(pool);
|
pools.push(pool);
|
||||||
servers.push(address);
|
servers.push(address);
|
||||||
@@ -443,7 +390,6 @@ impl ConnectionPool {
|
|||||||
// shards: pool_config.shards.clone(),
|
// shards: pool_config.shards.clone(),
|
||||||
shards: shard_ids.len(),
|
shards: shard_ids.len(),
|
||||||
user: user.clone(),
|
user: user.clone(),
|
||||||
db: pool_name.clone(),
|
|
||||||
default_role: match pool_config.default_role.as_str() {
|
default_role: match pool_config.default_role.as_str() {
|
||||||
"any" => None,
|
"any" => None,
|
||||||
"replica" => Some(Role::Replica),
|
"replica" => Some(Role::Replica),
|
||||||
@@ -469,10 +415,6 @@ impl ConnectionPool {
|
|||||||
auth_query: pool_config.auth_query.clone(),
|
auth_query: pool_config.auth_query.clone(),
|
||||||
auth_query_user: pool_config.auth_query_user.clone(),
|
auth_query_user: pool_config.auth_query_user.clone(),
|
||||||
auth_query_password: pool_config.auth_query_password.clone(),
|
auth_query_password: pool_config.auth_query_password.clone(),
|
||||||
plugins: match pool_config.plugins {
|
|
||||||
Some(ref plugins) => Some(plugins.clone()),
|
|
||||||
None => config.plugins.clone(),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
validated: Arc::new(AtomicBool::new(false)),
|
validated: Arc::new(AtomicBool::new(false)),
|
||||||
paused: Arc::new(AtomicBool::new(false)),
|
paused: Arc::new(AtomicBool::new(false)),
|
||||||
@@ -482,12 +424,10 @@ impl ConnectionPool {
|
|||||||
// Connect to the servers to make sure pool configuration is valid
|
// Connect to the servers to make sure pool configuration is valid
|
||||||
// before setting it globally.
|
// before setting it globally.
|
||||||
// Do this async and somewhere else, we don't have to wait here.
|
// Do this async and somewhere else, we don't have to wait here.
|
||||||
if config.general.validate_config {
|
let mut validate_pool = pool.clone();
|
||||||
let mut validate_pool = pool.clone();
|
tokio::task::spawn(async move {
|
||||||
tokio::task::spawn(async move {
|
let _ = validate_pool.validate().await;
|
||||||
let _ = validate_pool.validate().await;
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// There is one pool per database/user pair.
|
// There is one pool per database/user pair.
|
||||||
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
||||||
@@ -638,10 +578,7 @@ impl ConnectionPool {
|
|||||||
{
|
{
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Banning instance {:?}, error: {:?}", address, err);
|
||||||
"Connection checkout error for instance {:?}, error: {:?}",
|
|
||||||
address, err
|
|
||||||
);
|
|
||||||
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
client_stats.idle();
|
client_stats.idle();
|
||||||
@@ -717,7 +654,7 @@ impl ConnectionPool {
|
|||||||
// Health check failed.
|
// Health check failed.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Failed health check on instance {:?}, error: {:?}",
|
"Banning instance {:?} because of failed health check, {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -726,7 +663,7 @@ impl ConnectionPool {
|
|||||||
// Health check timed out.
|
// Health check timed out.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Health check timeout on instance {:?}, error: {:?}",
|
"Banning instance {:?} because of health check timeout, {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -748,16 +685,13 @@ impl ConnectionPool {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
error!("Banning instance {:?}, reason: {:?}", address, reason);
|
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let mut guard = self.banlist.write();
|
let mut guard = self.banlist.write();
|
||||||
|
error!("Banning {:?}", address);
|
||||||
if let Some(client_info) = client_info {
|
if let Some(client_info) = client_info {
|
||||||
client_info.ban_error();
|
client_info.ban_error();
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
}
|
}
|
||||||
|
|
||||||
guard[address.shard].insert(address.clone(), (reason, now));
|
guard[address.shard].insert(address.clone(), (reason, now));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -843,6 +777,7 @@ impl ConnectionPool {
|
|||||||
self.databases.len()
|
self.databases.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retrieve all bans for all servers.
|
||||||
pub fn get_bans(&self) -> Vec<(Address, (BanReason, NaiveDateTime))> {
|
pub fn get_bans(&self) -> Vec<(Address, (BanReason, NaiveDateTime))> {
|
||||||
let mut bans: Vec<(Address, (BanReason, NaiveDateTime))> = Vec::new();
|
let mut bans: Vec<(Address, (BanReason, NaiveDateTime))> = Vec::new();
|
||||||
let guard = self.banlist.read();
|
let guard = self.banlist.read();
|
||||||
@@ -854,7 +789,7 @@ impl ConnectionPool {
|
|||||||
return bans;
|
return bans;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the address from the host url
|
/// Get the address from the host url.
|
||||||
pub fn get_addresses_from_host(&self, host: &str) -> Vec<Address> {
|
pub fn get_addresses_from_host(&self, host: &str) -> Vec<Address> {
|
||||||
let mut addresses = Vec::new();
|
let mut addresses = Vec::new();
|
||||||
for shard in 0..self.shards() {
|
for shard in 0..self.shards() {
|
||||||
@@ -893,10 +828,13 @@ impl ConnectionPool {
|
|||||||
&self.addresses[shard][server]
|
&self.addresses[shard][server]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get server settings retrieved at connection setup.
|
||||||
pub fn server_info(&self) -> BytesMut {
|
pub fn server_info(&self) -> BytesMut {
|
||||||
self.server_info.read().clone()
|
self.server_info.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Calculate how many used connections in the pool
|
||||||
|
/// for the given server.
|
||||||
fn busy_connection_count(&self, address: &Address) -> u32 {
|
fn busy_connection_count(&self, address: &Address) -> u32 {
|
||||||
let state = self.pool_state(address.shard, address.address_index);
|
let state = self.pool_state(address.shard, address.address_index);
|
||||||
let idle = state.idle_connections;
|
let idle = state.idle_connections;
|
||||||
@@ -920,7 +858,6 @@ pub struct ServerPool {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
plugins: Option<Plugins>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerPool {
|
impl ServerPool {
|
||||||
@@ -931,7 +868,6 @@ impl ServerPool {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
plugins: Option<Plugins>,
|
|
||||||
) -> ServerPool {
|
) -> ServerPool {
|
||||||
ServerPool {
|
ServerPool {
|
||||||
address,
|
address,
|
||||||
@@ -940,7 +876,6 @@ impl ServerPool {
|
|||||||
client_server_map,
|
client_server_map,
|
||||||
stats,
|
stats,
|
||||||
auth_hash,
|
auth_hash,
|
||||||
plugins,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -973,19 +908,7 @@ impl ManageConnection for ServerPool {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(mut conn) => {
|
Ok(conn) => {
|
||||||
if let Some(ref plugins) = self.plugins {
|
|
||||||
if let Some(ref prewarmer) = plugins.prewarmer {
|
|
||||||
let mut prewarmer = prewarmer::Prewarmer {
|
|
||||||
enabled: prewarmer.enabled,
|
|
||||||
server: &mut conn,
|
|
||||||
queries: &prewarmer.queries,
|
|
||||||
};
|
|
||||||
|
|
||||||
prewarmer.run().await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.idle();
|
stats.idle();
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,16 +6,13 @@ use once_cell::sync::OnceCell;
|
|||||||
use regex::{Regex, RegexSet};
|
use regex::{Regex, RegexSet};
|
||||||
use sqlparser::ast::Statement::{Query, StartTransaction};
|
use sqlparser::ast::Statement::{Query, StartTransaction};
|
||||||
use sqlparser::ast::{
|
use sqlparser::ast::{
|
||||||
BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, SetExpr, Statement, TableFactor,
|
BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, SetExpr, TableFactor, Value,
|
||||||
Value,
|
|
||||||
};
|
};
|
||||||
use sqlparser::dialect::PostgreSqlDialect;
|
use sqlparser::dialect::PostgreSqlDialect;
|
||||||
use sqlparser::parser::Parser;
|
use sqlparser::parser::Parser;
|
||||||
|
|
||||||
use crate::config::Role;
|
use crate::config::Role;
|
||||||
use crate::errors::Error;
|
|
||||||
use crate::messages::BytesMutReader;
|
use crate::messages::BytesMutReader;
|
||||||
use crate::plugins::{Intercept, Plugin, PluginOutput, QueryLogger, TableAccess};
|
|
||||||
use crate::pool::PoolSettings;
|
use crate::pool::PoolSettings;
|
||||||
use crate::sharding::Sharder;
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
@@ -132,10 +129,6 @@ impl QueryRouter {
|
|||||||
self.pool_settings = pool_settings;
|
self.pool_settings = pool_settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pool_settings<'a>(&'a self) -> &'a PoolSettings {
|
|
||||||
&self.pool_settings
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to parse a command and execute it.
|
/// Try to parse a command and execute it.
|
||||||
pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
|
pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
|
||||||
let mut message_cursor = Cursor::new(message_buffer);
|
let mut message_cursor = Cursor::new(message_buffer);
|
||||||
@@ -331,7 +324,10 @@ impl QueryRouter {
|
|||||||
Some((command, value))
|
Some((command, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(message: &BytesMut) -> Result<Vec<sqlparser::ast::Statement>, Error> {
|
/// Try to infer which server to connect to based on the contents of the query.
|
||||||
|
pub fn infer(&mut self, message: &BytesMut) -> bool {
|
||||||
|
debug!("Inferring role");
|
||||||
|
|
||||||
let mut message_cursor = Cursor::new(message);
|
let mut message_cursor = Cursor::new(message);
|
||||||
|
|
||||||
let code = message_cursor.get_u8() as char;
|
let code = message_cursor.get_u8() as char;
|
||||||
@@ -357,29 +353,28 @@ impl QueryRouter {
|
|||||||
query
|
query
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => return Err(Error::UnsupportedStatement),
|
_ => return false,
|
||||||
};
|
};
|
||||||
|
|
||||||
match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
||||||
Ok(ast) => Ok(ast),
|
Ok(ast) => ast,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
// SELECT ... FOR UPDATE won't get parsed correctly.
|
||||||
debug!("{}: {}", err, query);
|
debug!("{}: {}", err, query);
|
||||||
Err(Error::QueryRouterParserError(err.to_string()))
|
self.active_role = Some(Role::Primary);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to infer which server to connect to based on the contents of the query.
|
debug!("AST: {:?}", ast);
|
||||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
|
||||||
debug!("Inferring role");
|
|
||||||
|
|
||||||
if ast.is_empty() {
|
if ast.is_empty() {
|
||||||
// That's weird, no idea, let's go to primary
|
// That's weird, no idea, let's go to primary
|
||||||
self.active_role = Some(Role::Primary);
|
self.active_role = Some(Role::Primary);
|
||||||
return Err(Error::QueryRouterParserError("empty query".into()));
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for q in ast {
|
for q in &ast {
|
||||||
match q {
|
match q {
|
||||||
// All transactions go to the primary, probably a write.
|
// All transactions go to the primary, probably a write.
|
||||||
StartTransaction { .. } => {
|
StartTransaction { .. } => {
|
||||||
@@ -423,7 +418,7 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the shard number from the Bind message
|
/// Parse the shard number from the Bind message
|
||||||
@@ -788,52 +783,6 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add your plugins here and execute them.
|
|
||||||
pub async fn execute_plugins(&self, ast: &Vec<Statement>) -> Result<PluginOutput, Error> {
|
|
||||||
let plugins = match self.pool_settings.plugins {
|
|
||||||
Some(ref plugins) => plugins,
|
|
||||||
None => return Ok(PluginOutput::Allow),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(ref query_logger) = plugins.query_logger {
|
|
||||||
let mut query_logger = QueryLogger {
|
|
||||||
enabled: query_logger.enabled,
|
|
||||||
user: &self.pool_settings.user.username,
|
|
||||||
db: &self.pool_settings.db,
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_logger.run(&self, ast).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ref intercept) = plugins.intercept {
|
|
||||||
let mut intercept = Intercept {
|
|
||||||
enabled: intercept.enabled,
|
|
||||||
config: &intercept,
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = intercept.run(&self, ast).await;
|
|
||||||
|
|
||||||
if let Ok(PluginOutput::Intercept(output)) = result {
|
|
||||||
return Ok(PluginOutput::Intercept(output));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ref table_access) = plugins.table_access {
|
|
||||||
let mut table_access = TableAccess {
|
|
||||||
enabled: table_access.enabled,
|
|
||||||
tables: &table_access.tables,
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = table_access.run(&self, ast).await;
|
|
||||||
|
|
||||||
if let Ok(PluginOutput::Deny(error)) = result {
|
|
||||||
return Ok(PluginOutput::Deny(error));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_sharding_key(&mut self, sharding_key: i64) -> Option<usize> {
|
fn set_sharding_key(&mut self, sharding_key: i64) -> Option<usize> {
|
||||||
let sharder = Sharder::new(
|
let sharder = Sharder::new(
|
||||||
self.pool_settings.shards,
|
self.pool_settings.shards,
|
||||||
@@ -861,23 +810,12 @@ impl QueryRouter {
|
|||||||
/// Should we attempt to parse queries?
|
/// Should we attempt to parse queries?
|
||||||
pub fn query_parser_enabled(&self) -> bool {
|
pub fn query_parser_enabled(&self) -> bool {
|
||||||
let enabled = match self.query_parser_enabled {
|
let enabled = match self.query_parser_enabled {
|
||||||
None => {
|
None => self.pool_settings.query_parser_enabled,
|
||||||
debug!(
|
Some(value) => value,
|
||||||
"Using pool settings, query_parser_enabled: {}",
|
|
||||||
self.pool_settings.query_parser_enabled
|
|
||||||
);
|
|
||||||
self.pool_settings.query_parser_enabled
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(value) => {
|
|
||||||
debug!(
|
|
||||||
"Using query parser override, query_parser_enabled: {}",
|
|
||||||
value
|
|
||||||
);
|
|
||||||
value
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
debug!("Query parser enabled: {}", enabled);
|
||||||
|
|
||||||
enabled
|
enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -924,7 +862,7 @@ mod test {
|
|||||||
|
|
||||||
for query in queries {
|
for query in queries {
|
||||||
// It's a recognized query
|
// It's a recognized query
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -943,7 +881,7 @@ mod test {
|
|||||||
|
|
||||||
for query in queries {
|
for query in queries {
|
||||||
// It's a recognized query
|
// It's a recognized query
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
assert_eq!(qr.role(), Some(Role::Primary));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -955,7 +893,7 @@ mod test {
|
|||||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||||
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO on")) != None);
|
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO on")) != None);
|
||||||
|
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -975,7 +913,7 @@ mod test {
|
|||||||
res.put(prepared_stmt);
|
res.put(prepared_stmt);
|
||||||
res.put_i16(0);
|
res.put_i16(0);
|
||||||
|
|
||||||
assert!(qr.infer(&QueryRouter::parse(&res).unwrap()).is_ok());
|
assert!(qr.infer(&res));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1139,11 +1077,11 @@ mod test {
|
|||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
|
|
||||||
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
assert_eq!(qr.role(), Some(Role::Primary));
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM test_table");
|
let query = simple_query("SELECT * FROM test_table");
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
|
|
||||||
assert!(qr.query_parser_enabled());
|
assert!(qr.query_parser_enabled());
|
||||||
@@ -1175,8 +1113,6 @@ mod test {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
db: "test".to_string(),
|
|
||||||
plugins: None,
|
|
||||||
};
|
};
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
assert_eq!(qr.active_role, None);
|
assert_eq!(qr.active_role, None);
|
||||||
@@ -1206,24 +1142,15 @@ mod test {
|
|||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("BEGIN; SELECT 1; COMMIT;")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("BEGIN; SELECT 1; COMMIT;")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Primary);
|
assert_eq!(qr.role(), Role::Primary);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT 1; SELECT 2;")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("SELECT 1; SELECT 2;")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Replica);
|
assert_eq!(qr.role(), Role::Replica);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Primary);
|
assert_eq!(qr.role(), Role::Primary);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1250,10 +1177,7 @@ mod test {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
db: "test".to_string(),
|
|
||||||
plugins: None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
qr.update_pool_settings(pool_settings.clone());
|
qr.update_pool_settings(pool_settings.clone());
|
||||||
|
|
||||||
@@ -1284,84 +1208,47 @@ mod test {
|
|||||||
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
||||||
qr.pool_settings.shards = 3;
|
qr.pool_settings.shards = 3;
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT * FROM data WHERE id = 5")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("SELECT * FROM data WHERE id = 5")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT one, two, three FROM public.data WHERE id = 6"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT one, two, three FROM public.data WHERE id = 6"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM data
|
||||||
&QueryRouter::parse(&simple_query(
|
|
||||||
"SELECT * FROM data
|
|
||||||
INNER JOIN t2 ON data.id = 5
|
INNER JOIN t2 ON data.id = 5
|
||||||
AND t2.data_id = data.id
|
AND t2.data_id = data.id
|
||||||
WHERE data.id = 5"
|
WHERE data.id = 5"
|
||||||
))
|
)));
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
// Shard did not move because we couldn't determine the sharding key since it could be ambiguous
|
// Shard did not move because we couldn't determine the sharding key since it could be ambiguous
|
||||||
// in the query.
|
// in the query.
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM t2 INNER JOIN data ON id = 6 AND data.id = t2.data_id"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT * FROM t2 INNER JOIN data ON id = 6 AND data.id = t2.data_id"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
r#"SELECT * FROM "public"."data" WHERE "id" = 6"#
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
r#"SELECT * FROM "public"."data" WHERE "id" = 6"#
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
r#"SELECT * FROM "public"."data" WHERE "data"."id" = 5"#
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
r#"SELECT * FROM "public"."data" WHERE "data"."id" = 5"#
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
// Super unique sharding key
|
// Super unique sharding key
|
||||||
qr.pool_settings.automatic_sharding_key = Some("*.unique_enough_column_name".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("*.unique_enough_column_name".to_string());
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM table_x WHERE unique_enough_column_name = 6"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT * FROM table_x WHERE unique_enough_column_name = 6"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT * FROM table_y WHERE another_key = 5")));
|
||||||
.infer(
|
|
||||||
&QueryRouter::parse(&simple_query("SELECT * FROM table_y WHERE another_key = 5"))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1385,61 +1272,11 @@ mod test {
|
|||||||
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
||||||
qr.pool_settings.shards = 3;
|
qr.pool_settings.shards = 3;
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(stmt)));
|
||||||
.infer(&QueryRouter::parse(&simple_query(stmt)).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.placeholders.len(), 1);
|
assert_eq!(qr.placeholders.len(), 1);
|
||||||
|
|
||||||
assert!(qr.infer_shard_from_bind(&bind));
|
assert!(qr.infer_shard_from_bind(&bind));
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
assert!(qr.placeholders.is_empty());
|
assert!(qr.placeholders.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_table_access_plugin() {
|
|
||||||
use crate::config::{Plugins, TableAccess};
|
|
||||||
let table_access = TableAccess {
|
|
||||||
enabled: true,
|
|
||||||
tables: vec![String::from("pg_database")],
|
|
||||||
};
|
|
||||||
let plugins = Plugins {
|
|
||||||
table_access: Some(table_access),
|
|
||||||
intercept: None,
|
|
||||||
query_logger: None,
|
|
||||||
prewarmer: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut pool_settings = PoolSettings::default();
|
|
||||||
pool_settings.query_parser_enabled = true;
|
|
||||||
pool_settings.plugins = Some(plugins);
|
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.update_pool_settings(pool_settings);
|
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM pg_database");
|
|
||||||
let ast = QueryRouter::parse(&query).unwrap();
|
|
||||||
|
|
||||||
let res = qr.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
res,
|
|
||||||
Ok(PluginOutput::Deny(
|
|
||||||
"permission for table \"pg_database\" denied".to_string()
|
|
||||||
))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_plugins_disabled_by_defaault() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let qr = QueryRouter::new();
|
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM pg_database");
|
|
||||||
let ast = QueryRouter::parse(&query).unwrap();
|
|
||||||
|
|
||||||
let res = qr.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
assert_eq!(res, Ok(PluginOutput::Allow));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
358
src/server.rs
358
src/server.rs
@@ -7,143 +7,22 @@ use parking_lot::{Mutex, RwLock};
|
|||||||
use postgres_protocol::message;
|
use postgres_protocol::message;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
|
use tokio::io::{AsyncReadExt, BufReader};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::{
|
||||||
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
|
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||||
use tokio_rustls::{client::TlsStream, TlsConnector};
|
TcpStream,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::config::{get_config, Address, User};
|
use crate::config::{Address, User};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
|
|
||||||
use crate::errors::{Error, ServerIdentifier};
|
use crate::errors::{Error, ServerIdentifier};
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::mirrors::MirroringManager;
|
use crate::mirrors::MirroringManager;
|
||||||
use crate::pool::ClientServerMap;
|
use crate::pool::ClientServerMap;
|
||||||
use crate::scram::ScramSha256;
|
use crate::scram::ScramSha256;
|
||||||
use crate::stats::ServerStats;
|
use crate::stats::ServerStats;
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
use pin_project::pin_project;
|
|
||||||
|
|
||||||
#[pin_project(project = SteamInnerProj)]
|
|
||||||
pub enum StreamInner {
|
|
||||||
Plain {
|
|
||||||
#[pin]
|
|
||||||
stream: TcpStream,
|
|
||||||
},
|
|
||||||
Tls {
|
|
||||||
#[pin]
|
|
||||||
stream: TlsStream<TcpStream>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncWrite for StreamInner {
|
|
||||||
fn poll_write(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
buf: &[u8],
|
|
||||||
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
|
||||||
let this = self.project();
|
|
||||||
match this {
|
|
||||||
SteamInnerProj::Tls { stream } => stream.poll_write(cx, buf),
|
|
||||||
SteamInnerProj::Plain { stream } => stream.poll_write(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
||||||
let this = self.project();
|
|
||||||
match this {
|
|
||||||
SteamInnerProj::Tls { stream } => stream.poll_flush(cx),
|
|
||||||
SteamInnerProj::Plain { stream } => stream.poll_flush(cx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_shutdown(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
||||||
let this = self.project();
|
|
||||||
match this {
|
|
||||||
SteamInnerProj::Tls { stream } => stream.poll_shutdown(cx),
|
|
||||||
SteamInnerProj::Plain { stream } => stream.poll_shutdown(cx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for StreamInner {
|
|
||||||
fn poll_read(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
|
||||||
) -> std::task::Poll<std::io::Result<()>> {
|
|
||||||
let this = self.project();
|
|
||||||
match this {
|
|
||||||
SteamInnerProj::Tls { stream } => stream.poll_read(cx, buf),
|
|
||||||
SteamInnerProj::Plain { stream } => stream.poll_read(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StreamInner {
|
|
||||||
pub fn try_write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
||||||
match self {
|
|
||||||
StreamInner::Tls { stream } => {
|
|
||||||
let r = stream.get_mut();
|
|
||||||
let mut w = r.1.writer();
|
|
||||||
w.write(buf)
|
|
||||||
}
|
|
||||||
StreamInner::Plain { stream } => stream.try_write(buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
|
||||||
struct CleanupState {
|
|
||||||
/// If server connection requires DISCARD ALL before checkin because of set statement
|
|
||||||
needs_cleanup_set: bool,
|
|
||||||
|
|
||||||
/// If server connection requires DISCARD ALL before checkin because of prepare statement
|
|
||||||
needs_cleanup_prepare: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CleanupState {
|
|
||||||
fn new() -> Self {
|
|
||||||
CleanupState {
|
|
||||||
needs_cleanup_set: false,
|
|
||||||
needs_cleanup_prepare: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn needs_cleanup(&self) -> bool {
|
|
||||||
self.needs_cleanup_set || self.needs_cleanup_prepare
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_true(&mut self) {
|
|
||||||
self.needs_cleanup_set = true;
|
|
||||||
self.needs_cleanup_prepare = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset(&mut self) {
|
|
||||||
self.needs_cleanup_set = false;
|
|
||||||
self.needs_cleanup_prepare = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for CleanupState {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"SET: {}, PREPARE: {}",
|
|
||||||
self.needs_cleanup_set, self.needs_cleanup_prepare
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Server state.
|
/// Server state.
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
@@ -151,11 +30,15 @@ pub struct Server {
|
|||||||
/// port, e.g. 5432, and role, e.g. primary or replica.
|
/// port, e.g. 5432, and role, e.g. primary or replica.
|
||||||
address: Address,
|
address: Address,
|
||||||
|
|
||||||
/// Server TCP connection.
|
/// Buffered read socket.
|
||||||
stream: BufStream<StreamInner>,
|
read: BufReader<OwnedReadHalf>,
|
||||||
|
|
||||||
|
/// Unbuffered write socket (our client code buffers).
|
||||||
|
write: OwnedWriteHalf,
|
||||||
|
|
||||||
/// Our server response buffer. We buffer data before we give it to the client.
|
/// Our server response buffer. We buffer data before we give it to the client.
|
||||||
buffer: BytesMut,
|
buffer: BytesMut,
|
||||||
|
is_async: bool,
|
||||||
|
|
||||||
/// Server information the server sent us over on startup.
|
/// Server information the server sent us over on startup.
|
||||||
server_info: BytesMut,
|
server_info: BytesMut,
|
||||||
@@ -173,8 +56,8 @@ pub struct Server {
|
|||||||
/// Is the server broken? We'll remote it from the pool if so.
|
/// Is the server broken? We'll remote it from the pool if so.
|
||||||
bad: bool,
|
bad: bool,
|
||||||
|
|
||||||
/// If server connection requires DISCARD ALL before checkin
|
/// If server connection requires a DISCARD ALL before checkin
|
||||||
cleanup_state: CleanupState,
|
needs_cleanup: bool,
|
||||||
|
|
||||||
/// Mapping of clients and servers used for query cancellation.
|
/// Mapping of clients and servers used for query cancellation.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
@@ -192,9 +75,6 @@ pub struct Server {
|
|||||||
last_activity: SystemTime,
|
last_activity: SystemTime,
|
||||||
|
|
||||||
mirror_manager: Option<MirroringManager>,
|
mirror_manager: Option<MirroringManager>,
|
||||||
|
|
||||||
// Associated addresses used
|
|
||||||
addr_set: Option<AddrSet>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
@@ -208,24 +88,6 @@ impl Server {
|
|||||||
stats: Arc<ServerStats>,
|
stats: Arc<ServerStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
) -> Result<Server, Error> {
|
) -> Result<Server, Error> {
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
let mut addr_set: Option<AddrSet> = None;
|
|
||||||
|
|
||||||
// If we are caching addresses and hostname is not an IP
|
|
||||||
if cached_resolver.enabled() && address.host.parse::<IpAddr>().is_err() {
|
|
||||||
debug!("Resolving {}", &address.host);
|
|
||||||
addr_set = match cached_resolver.lookup_ip(&address.host).await {
|
|
||||||
Ok(ok) => {
|
|
||||||
debug!("Obtained: {:?}", ok);
|
|
||||||
Some(ok)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Error trying to resolve {}, ({:?})", &address.host, err);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stream =
|
let mut stream =
|
||||||
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
|
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
|
||||||
Ok(stream) => stream,
|
Ok(stream) => stream,
|
||||||
@@ -237,88 +99,8 @@ impl Server {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TCP timeouts.
|
|
||||||
configure_socket(&stream);
|
configure_socket(&stream);
|
||||||
|
|
||||||
let config = get_config();
|
|
||||||
|
|
||||||
let mut stream = if config.general.server_tls {
|
|
||||||
// Request a TLS connection
|
|
||||||
ssl_request(&mut stream).await?;
|
|
||||||
|
|
||||||
let response = match stream.read_u8().await {
|
|
||||||
Ok(response) => response as char,
|
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::SocketError(format!(
|
|
||||||
"Server socket error: {:?}",
|
|
||||||
err
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match response {
|
|
||||||
// Server supports TLS
|
|
||||||
'S' => {
|
|
||||||
debug!("Connecting to server using TLS");
|
|
||||||
|
|
||||||
let mut root_store = RootCertStore::empty();
|
|
||||||
root_store.add_server_trust_anchors(
|
|
||||||
webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
|
|
||||||
OwnedTrustAnchor::from_subject_spki_name_constraints(
|
|
||||||
ta.subject,
|
|
||||||
ta.spki,
|
|
||||||
ta.name_constraints,
|
|
||||||
)
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut tls_config = rustls::ClientConfig::builder()
|
|
||||||
.with_safe_defaults()
|
|
||||||
.with_root_certificates(root_store)
|
|
||||||
.with_no_client_auth();
|
|
||||||
|
|
||||||
// Equivalent to sslmode=prefer which is fine most places.
|
|
||||||
// If you want verify-full, change `verify_server_certificate` to true.
|
|
||||||
if !config.general.verify_server_certificate {
|
|
||||||
let mut dangerous = tls_config.dangerous();
|
|
||||||
dangerous.set_certificate_verifier(Arc::new(
|
|
||||||
crate::tls::NoCertificateVerification {},
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let connector = TlsConnector::from(Arc::new(tls_config));
|
|
||||||
let stream = match connector
|
|
||||||
.connect(address.host.as_str().try_into().unwrap(), stream)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(stream) => stream,
|
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::SocketError(format!("Server TLS error: {:?}", err)))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamInner::Tls { stream }
|
|
||||||
}
|
|
||||||
|
|
||||||
// Server does not support TLS
|
|
||||||
'N' => StreamInner::Plain { stream },
|
|
||||||
|
|
||||||
// Something else?
|
|
||||||
m => {
|
|
||||||
return Err(Error::SocketError(format!(
|
|
||||||
"Unknown message: {}",
|
|
||||||
m as char
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
StreamInner::Plain { stream }
|
|
||||||
};
|
|
||||||
|
|
||||||
// let (read, write) = split(stream);
|
|
||||||
// let (mut read, mut write) = (ReadInner::Plain { stream: read }, WriteInner::Plain { stream: write });
|
|
||||||
|
|
||||||
trace!("Sending StartupMessage");
|
trace!("Sending StartupMessage");
|
||||||
|
|
||||||
// StartupMessage
|
// StartupMessage
|
||||||
@@ -464,7 +246,7 @@ impl Server {
|
|||||||
|
|
||||||
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
|
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
|
||||||
|
|
||||||
if sasl_type.contains(SCRAM_SHA_256) {
|
if sasl_type == SCRAM_SHA_256 {
|
||||||
debug!("Using {}", SCRAM_SHA_256);
|
debug!("Using {}", SCRAM_SHA_256);
|
||||||
|
|
||||||
// Generate client message.
|
// Generate client message.
|
||||||
@@ -487,7 +269,7 @@ impl Server {
|
|||||||
res.put_i32(sasl_response.len() as i32);
|
res.put_i32(sasl_response.len() as i32);
|
||||||
res.put(sasl_response);
|
res.put(sasl_response);
|
||||||
|
|
||||||
write_all_flush(&mut stream, &res).await?;
|
write_all(&mut stream, res).await?;
|
||||||
} else {
|
} else {
|
||||||
error!("Unsupported SCRAM version: {}", sasl_type);
|
error!("Unsupported SCRAM version: {}", sasl_type);
|
||||||
return Err(Error::ServerError);
|
return Err(Error::ServerError);
|
||||||
@@ -518,7 +300,7 @@ impl Server {
|
|||||||
res.put_i32(4 + sasl_response.len() as i32);
|
res.put_i32(4 + sasl_response.len() as i32);
|
||||||
res.put(sasl_response);
|
res.put(sasl_response);
|
||||||
|
|
||||||
write_all_flush(&mut stream, &res).await?;
|
write_all(&mut stream, res).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
SASL_FINAL => {
|
SASL_FINAL => {
|
||||||
@@ -662,19 +444,22 @@ impl Server {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let (read, write) = stream.into_split();
|
||||||
|
|
||||||
let mut server = Server {
|
let mut server = Server {
|
||||||
address: address.clone(),
|
address: address.clone(),
|
||||||
stream: BufStream::new(stream),
|
read: BufReader::new(read),
|
||||||
|
write,
|
||||||
buffer: BytesMut::with_capacity(8196),
|
buffer: BytesMut::with_capacity(8196),
|
||||||
|
is_async: false,
|
||||||
server_info,
|
server_info,
|
||||||
process_id,
|
process_id,
|
||||||
secret_key,
|
secret_key,
|
||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
cleanup_state: CleanupState::new(),
|
needs_cleanup: false,
|
||||||
client_server_map,
|
client_server_map,
|
||||||
addr_set,
|
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
stats,
|
stats,
|
||||||
application_name: String::new(),
|
application_name: String::new(),
|
||||||
@@ -732,7 +517,7 @@ impl Server {
|
|||||||
bytes.put_i32(process_id);
|
bytes.put_i32(process_id);
|
||||||
bytes.put_i32(secret_key);
|
bytes.put_i32(secret_key);
|
||||||
|
|
||||||
write_all_flush(&mut stream, &bytes).await
|
write_all(&mut stream, bytes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send messages to the server from the client.
|
/// Send messages to the server from the client.
|
||||||
@@ -740,35 +525,39 @@ impl Server {
|
|||||||
self.mirror_send(messages);
|
self.mirror_send(messages);
|
||||||
self.stats().data_sent(messages.len());
|
self.stats().data_sent(messages.len());
|
||||||
|
|
||||||
match write_all_flush(&mut self.stream, &messages).await {
|
match write_all_half(&mut self.write, messages).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
// Successfully sent to server
|
// Successfully sent to server
|
||||||
self.last_activity = SystemTime::now();
|
self.last_activity = SystemTime::now();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Terminating server because of: {:?}", err);
|
||||||
"Terminating server {:?} because of: {:?}",
|
|
||||||
self.address, err
|
|
||||||
);
|
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Switch to async mode, flushing messages as soon
|
||||||
|
/// as we receive them without buffering or waiting for "ReadyForQuery".
|
||||||
|
pub fn switch_async(&mut self, on: bool) {
|
||||||
|
if on {
|
||||||
|
self.is_async = true;
|
||||||
|
} else {
|
||||||
|
self.is_async = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Receive data from the server in response to a client request.
|
/// Receive data from the server in response to a client request.
|
||||||
/// This method must be called multiple times while `self.is_data_available()` is true
|
/// This method must be called multiple times while `self.is_data_available()` is true
|
||||||
/// in order to receive all data the server has to offer.
|
/// in order to receive all data the server has to offer.
|
||||||
pub async fn recv(&mut self) -> Result<BytesMut, Error> {
|
pub async fn recv(&mut self) -> Result<BytesMut, Error> {
|
||||||
loop {
|
loop {
|
||||||
let mut message = match read_message(&mut self.stream).await {
|
let mut message = match read_message(&mut self.read).await {
|
||||||
Ok(message) => message,
|
Ok(message) => message,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Terminating server because of: {:?}", err);
|
||||||
"Terminating server {:?} because of: {:?}",
|
|
||||||
self.address, err
|
|
||||||
);
|
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
@@ -835,12 +624,12 @@ impl Server {
|
|||||||
// This will reduce amount of discard statements sent
|
// This will reduce amount of discard statements sent
|
||||||
if !self.in_transaction {
|
if !self.in_transaction {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.cleanup_state.needs_cleanup_set = true;
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"PREPARE\0" => {
|
"PREPARE\0" => {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.cleanup_state.needs_cleanup_prepare = true;
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -855,7 +644,10 @@ impl Server {
|
|||||||
// DataRow
|
// DataRow
|
||||||
'D' => {
|
'D' => {
|
||||||
// More data is available after this message, this is not the end of the reply.
|
// More data is available after this message, this is not the end of the reply.
|
||||||
self.data_available = true;
|
// If we're async, flush to client now.
|
||||||
|
if !self.is_async {
|
||||||
|
self.data_available = true;
|
||||||
|
}
|
||||||
|
|
||||||
// Don't flush yet, the more we buffer, the faster this goes...up to a limit.
|
// Don't flush yet, the more we buffer, the faster this goes...up to a limit.
|
||||||
if self.buffer.len() >= 8196 {
|
if self.buffer.len() >= 8196 {
|
||||||
@@ -868,7 +660,10 @@ impl Server {
|
|||||||
|
|
||||||
// CopyOutResponse: copy is starting from the server to the client.
|
// CopyOutResponse: copy is starting from the server to the client.
|
||||||
'H' => {
|
'H' => {
|
||||||
self.data_available = true;
|
// If we're in async mode, flush now.
|
||||||
|
if !self.is_async {
|
||||||
|
self.data_available = true;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -888,6 +683,10 @@ impl Server {
|
|||||||
// Keep buffering until ReadyForQuery shows up.
|
// Keep buffering until ReadyForQuery shows up.
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if self.is_async {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let bytes = self.buffer.clone();
|
let bytes = self.buffer.clone();
|
||||||
@@ -921,23 +720,7 @@ impl Server {
|
|||||||
/// Server & client are out of sync, we must discard this connection.
|
/// Server & client are out of sync, we must discard this connection.
|
||||||
/// This happens with clients that misbehave.
|
/// This happens with clients that misbehave.
|
||||||
pub fn is_bad(&self) -> bool {
|
pub fn is_bad(&self) -> bool {
|
||||||
if self.bad {
|
self.bad
|
||||||
return self.bad;
|
|
||||||
};
|
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
if cached_resolver.enabled() {
|
|
||||||
if let Some(addr_set) = &self.addr_set {
|
|
||||||
if cached_resolver.has_changed(self.address.host.as_str(), addr_set) {
|
|
||||||
warn!(
|
|
||||||
"DNS changed for {}, it was {:?}. Dropping server connection.",
|
|
||||||
self.address.host.as_str(),
|
|
||||||
addr_set
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get server startup information to forward it to the client.
|
/// Get server startup information to forward it to the client.
|
||||||
@@ -970,8 +753,6 @@ impl Server {
|
|||||||
/// It will use the simple query protocol.
|
/// It will use the simple query protocol.
|
||||||
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
||||||
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
||||||
debug!("Running `{}` on server {:?}", query, self.address);
|
|
||||||
|
|
||||||
let query = simple_query(query);
|
let query = simple_query(query);
|
||||||
|
|
||||||
self.send(&query).await?;
|
self.send(&query).await?;
|
||||||
@@ -1004,11 +785,10 @@ impl Server {
|
|||||||
// to avoid leaking state between clients. For performance reasons we only
|
// to avoid leaking state between clients. For performance reasons we only
|
||||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||||
// it before each checkin.
|
// it before each checkin.
|
||||||
if self.cleanup_state.needs_cleanup() {
|
if self.needs_cleanup {
|
||||||
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
|
warn!("Server returned with session state altered, discarding state");
|
||||||
self.query("DISCARD ALL").await?;
|
self.query("DISCARD ALL").await?;
|
||||||
self.query("RESET ROLE").await?;
|
self.needs_cleanup = false;
|
||||||
self.cleanup_state.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1020,12 +800,12 @@ impl Server {
|
|||||||
self.application_name = name.to_string();
|
self.application_name = name.to_string();
|
||||||
// We don't want `SET application_name` to mark the server connection
|
// We don't want `SET application_name` to mark the server connection
|
||||||
// as needing cleanup
|
// as needing cleanup
|
||||||
let needs_cleanup_before = self.cleanup_state;
|
let needs_cleanup_before = self.needs_cleanup;
|
||||||
|
|
||||||
let result = Ok(self
|
let result = Ok(self
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
.await?);
|
.await?);
|
||||||
self.cleanup_state = needs_cleanup_before;
|
self.needs_cleanup = needs_cleanup_before;
|
||||||
result
|
result
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1050,7 +830,7 @@ impl Server {
|
|||||||
|
|
||||||
// Marks a connection as needing DISCARD ALL at checkin
|
// Marks a connection as needing DISCARD ALL at checkin
|
||||||
pub fn mark_dirty(&mut self) {
|
pub fn mark_dirty(&mut self) {
|
||||||
self.cleanup_state.set_true();
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
||||||
@@ -1177,27 +957,23 @@ impl Drop for Server {
|
|||||||
// Update statistics
|
// Update statistics
|
||||||
self.stats.disconnect();
|
self.stats.disconnect();
|
||||||
|
|
||||||
let mut bytes = BytesMut::with_capacity(5);
|
let mut bytes = BytesMut::with_capacity(4);
|
||||||
bytes.put_u8(b'X');
|
bytes.put_u8(b'X');
|
||||||
bytes.put_i32(4);
|
bytes.put_i32(4);
|
||||||
|
|
||||||
match self.stream.get_mut().try_write(&bytes) {
|
match self.write.try_write(&bytes) {
|
||||||
Ok(5) => (),
|
Ok(_) => (),
|
||||||
_ => debug!("Dirty shutdown"),
|
Err(_) => debug!("Dirty shutdown"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Should not matter.
|
||||||
|
self.bad = true;
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let duration = now - self.connected_at;
|
let duration = now - self.connected_at;
|
||||||
|
|
||||||
let message = if self.bad {
|
|
||||||
"Server connection terminated"
|
|
||||||
} else {
|
|
||||||
"Server connection closed"
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"{} {:?}, session duration: {}",
|
"Server connection closed {:?}, session duration: {}",
|
||||||
message,
|
|
||||||
self.address,
|
self.address,
|
||||||
crate::format_duration(&duration)
|
crate::format_duration(&duration)
|
||||||
);
|
);
|
||||||
|
|||||||
15
src/stats.rs
15
src/stats.rs
@@ -107,19 +107,8 @@ impl Collector {
|
|||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
// Hold read lock for duration of update to retain all server stats
|
for stats in SERVER_STATS.read().values() {
|
||||||
let server_stats = SERVER_STATS.read();
|
stats.address_stats().update_averages();
|
||||||
|
|
||||||
for stats in server_stats.values() {
|
|
||||||
if !stats.check_address_stat_average_is_updated_status() {
|
|
||||||
stats.address_stats().update_averages();
|
|
||||||
stats.set_address_stat_average_is_updated_status(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset to false for next update
|
|
||||||
for stats in server_stats.values() {
|
|
||||||
stats.set_address_stat_average_is_updated_status(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use log::warn;
|
||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -12,16 +13,6 @@ pub struct AddressStats {
|
|||||||
pub total_query_time: Arc<AtomicU64>,
|
pub total_query_time: Arc<AtomicU64>,
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
pub total_wait_time: Arc<AtomicU64>,
|
||||||
pub total_errors: Arc<AtomicU64>,
|
pub total_errors: Arc<AtomicU64>,
|
||||||
|
|
||||||
pub old_total_xact_count: Arc<AtomicU64>,
|
|
||||||
pub old_total_query_count: Arc<AtomicU64>,
|
|
||||||
pub old_total_received: Arc<AtomicU64>,
|
|
||||||
pub old_total_sent: Arc<AtomicU64>,
|
|
||||||
pub old_total_xact_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_query_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_wait_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_errors: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
pub avg_query_count: Arc<AtomicU64>,
|
pub avg_query_count: Arc<AtomicU64>,
|
||||||
pub avg_query_time: Arc<AtomicU64>,
|
pub avg_query_time: Arc<AtomicU64>,
|
||||||
pub avg_recv: Arc<AtomicU64>,
|
pub avg_recv: Arc<AtomicU64>,
|
||||||
@@ -30,9 +21,6 @@ pub struct AddressStats {
|
|||||||
pub avg_xact_time: Arc<AtomicU64>,
|
pub avg_xact_time: Arc<AtomicU64>,
|
||||||
pub avg_xact_count: Arc<AtomicU64>,
|
pub avg_xact_count: Arc<AtomicU64>,
|
||||||
pub avg_wait_time: Arc<AtomicU64>,
|
pub avg_wait_time: Arc<AtomicU64>,
|
||||||
|
|
||||||
// Determines if the averages have been updated since the last time they were reported
|
|
||||||
pub averages_updated: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoIterator for AddressStats {
|
impl IntoIterator for AddressStats {
|
||||||
@@ -116,15 +104,16 @@ impl AddressStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_averages(&self) {
|
pub fn update_averages(&self) {
|
||||||
let (totals, averages, old_totals) = self.fields_iterators();
|
let (totals, averages) = self.fields_iterators();
|
||||||
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
|
for data in totals.iter().zip(averages.iter()) {
|
||||||
let total_value = total.load(Ordering::Relaxed);
|
let (total, average) = data;
|
||||||
let old_total_value = old_total.load(Ordering::Relaxed);
|
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
|
||||||
average.store(
|
let total = total.load(Ordering::Relaxed);
|
||||||
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
|
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
|
||||||
Ordering::Relaxed,
|
Some(avg)
|
||||||
); // Avg / second
|
}) {
|
||||||
old_total.store(total_value, Ordering::Relaxed);
|
warn!("Could not update averages for addresses stats, {:?}", err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,42 +123,27 @@ impl AddressStats {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fields_iterators(
|
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
|
||||||
&self,
|
|
||||||
) -> (
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
) {
|
|
||||||
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
||||||
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
||||||
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
|
|
||||||
totals.push(self.total_xact_count.clone());
|
totals.push(self.total_xact_count.clone());
|
||||||
old_totals.push(self.old_total_xact_count.clone());
|
|
||||||
averages.push(self.avg_xact_count.clone());
|
averages.push(self.avg_xact_count.clone());
|
||||||
totals.push(self.total_query_count.clone());
|
totals.push(self.total_query_count.clone());
|
||||||
old_totals.push(self.old_total_query_count.clone());
|
|
||||||
averages.push(self.avg_query_count.clone());
|
averages.push(self.avg_query_count.clone());
|
||||||
totals.push(self.total_received.clone());
|
totals.push(self.total_received.clone());
|
||||||
old_totals.push(self.old_total_received.clone());
|
|
||||||
averages.push(self.avg_recv.clone());
|
averages.push(self.avg_recv.clone());
|
||||||
totals.push(self.total_sent.clone());
|
totals.push(self.total_sent.clone());
|
||||||
old_totals.push(self.old_total_sent.clone());
|
|
||||||
averages.push(self.avg_sent.clone());
|
averages.push(self.avg_sent.clone());
|
||||||
totals.push(self.total_xact_time.clone());
|
totals.push(self.total_xact_time.clone());
|
||||||
old_totals.push(self.old_total_xact_time.clone());
|
|
||||||
averages.push(self.avg_xact_time.clone());
|
averages.push(self.avg_xact_time.clone());
|
||||||
totals.push(self.total_query_time.clone());
|
totals.push(self.total_query_time.clone());
|
||||||
old_totals.push(self.old_total_query_time.clone());
|
|
||||||
averages.push(self.avg_query_time.clone());
|
averages.push(self.avg_query_time.clone());
|
||||||
totals.push(self.total_wait_time.clone());
|
totals.push(self.total_wait_time.clone());
|
||||||
old_totals.push(self.old_total_wait_time.clone());
|
|
||||||
averages.push(self.avg_wait_time.clone());
|
averages.push(self.avg_wait_time.clone());
|
||||||
totals.push(self.total_errors.clone());
|
totals.push(self.total_errors.clone());
|
||||||
old_totals.push(self.old_total_errors.clone());
|
|
||||||
averages.push(self.avg_errors.clone());
|
averages.push(self.avg_errors.clone());
|
||||||
|
|
||||||
(totals, averages, old_totals)
|
(totals, averages)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,17 +139,6 @@ impl ServerStats {
|
|||||||
self.address.stats.clone()
|
self.address.stats.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_address_stat_average_is_updated_status(&self) -> bool {
|
|
||||||
self.address.stats.averages_updated.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_address_stat_average_is_updated_status(&self, is_checked: bool) {
|
|
||||||
self.address
|
|
||||||
.stats
|
|
||||||
.averages_updated
|
|
||||||
.store(is_checked, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper methods for show_servers
|
// Helper methods for show_servers
|
||||||
pub fn pool_name(&self) -> String {
|
pub fn pool_name(&self) -> String {
|
||||||
self.pool_stats.database()
|
self.pool_stats.database()
|
||||||
|
|||||||
23
src/tls.rs
23
src/tls.rs
@@ -4,12 +4,7 @@ use rustls_pemfile::{certs, read_one, Item};
|
|||||||
use std::iter;
|
use std::iter;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::SystemTime;
|
use tokio_rustls::rustls::{self, Certificate, PrivateKey};
|
||||||
use tokio_rustls::rustls::{
|
|
||||||
self,
|
|
||||||
client::{ServerCertVerified, ServerCertVerifier},
|
|
||||||
Certificate, PrivateKey, ServerName,
|
|
||||||
};
|
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
|
|
||||||
use crate::config::get_config;
|
use crate::config::get_config;
|
||||||
@@ -69,19 +64,3 @@ impl Tls {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NoCertificateVerification;
|
|
||||||
|
|
||||||
impl ServerCertVerifier for NoCertificateVerification {
|
|
||||||
fn verify_server_cert(
|
|
||||||
&self,
|
|
||||||
_end_entity: &Certificate,
|
|
||||||
_intermediates: &[Certificate],
|
|
||||||
_server_name: &ServerName,
|
|
||||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
|
||||||
_ocsp_response: &[u8],
|
|
||||||
_now: SystemTime,
|
|
||||||
) -> Result<ServerCertVerified, rustls::Error> {
|
|
||||||
Ok(ServerCertVerified::assertion())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
60
tests/python/async_test.py
Normal file
60
tests/python/async_test.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import psycopg2
|
||||||
|
import asyncio
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
|
||||||
|
def regular_main():
|
||||||
|
# Connect to the PostgreSQL database
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=PGCAT_HOST,
|
||||||
|
database="sharded_db",
|
||||||
|
user="sharding_user",
|
||||||
|
password="sharding_user",
|
||||||
|
port=PGCAT_PORT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Open a cursor to perform database operations
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
# Execute a SQL query
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
|
||||||
|
# Fetch the results
|
||||||
|
rows = cur.fetchall()
|
||||||
|
|
||||||
|
# Print the results
|
||||||
|
for row in rows:
|
||||||
|
print(row[0])
|
||||||
|
|
||||||
|
# Close the cursor and the database connection
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Connect to the PostgreSQL database
|
||||||
|
conn = await asyncpg.connect(
|
||||||
|
host=PGCAT_HOST,
|
||||||
|
database="sharded_db",
|
||||||
|
user="sharding_user",
|
||||||
|
password="sharding_user",
|
||||||
|
port=PGCAT_PORT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute a SQL query
|
||||||
|
for _ in range(25):
|
||||||
|
rows = await conn.fetch("SELECT 1")
|
||||||
|
|
||||||
|
# Print the results
|
||||||
|
for row in rows:
|
||||||
|
print(row[0])
|
||||||
|
|
||||||
|
# Close the database connection
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
regular_main()
|
||||||
|
asyncio.run(main())
|
||||||
@@ -1,2 +1,11 @@
|
|||||||
|
asyncio==3.4.3
|
||||||
|
asyncpg==0.27.0
|
||||||
|
black==23.3.0
|
||||||
|
click==8.1.3
|
||||||
|
mypy-extensions==1.0.0
|
||||||
|
packaging==23.1
|
||||||
|
pathspec==0.11.1
|
||||||
|
platformdirs==3.2.0
|
||||||
|
psutil==5.9.1
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
tomli==2.0.1
|
||||||
|
|||||||
@@ -14,12 +14,11 @@ describe "Admin" do
|
|||||||
describe "SHOW STATS" do
|
describe "SHOW STATS" do
|
||||||
context "clients connect and make one query" do
|
context "clients connect and make one query" do
|
||||||
it "updates *_query_time and *_wait_time" do
|
it "updates *_query_time and *_wait_time" do
|
||||||
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
|
||||||
connections.each do |c|
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
end
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
sleep(1)
|
connection.close
|
||||||
connections.map(&:close)
|
|
||||||
|
|
||||||
# wait for averages to be calculated, we shouldn't do this too often
|
# wait for averages to be calculated, we shouldn't do this too often
|
||||||
sleep(15.5)
|
sleep(15.5)
|
||||||
@@ -27,7 +26,7 @@ describe "Admin" do
|
|||||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
admin_conn.close
|
admin_conn.close
|
||||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
|
expect(results["avg_query_time"].to_i).to_not eq(0)
|
||||||
|
|
||||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
@@ -72,17 +71,15 @@ describe "Admin" do
|
|||||||
|
|
||||||
context "client connects but issues no queries" do
|
context "client connects but issues no queries" do
|
||||||
it "only affects cl_idle stats" do
|
it "only affects cl_idle stats" do
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
|
|
||||||
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
|
|
||||||
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["cl_idle"]).to eq("20")
|
expect(results["cl_idle"]).to eq("20")
|
||||||
expect(results["sv_idle"]).to eq(before_test)
|
expect(results["sv_idle"]).to eq("1")
|
||||||
|
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
sleep(1.1)
|
sleep(1.1)
|
||||||
@@ -90,7 +87,7 @@ describe "Admin" do
|
|||||||
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["sv_idle"]).to eq(before_test)
|
expect(results["sv_idle"]).to eq("1")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -41,24 +41,7 @@ module Helpers
|
|||||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||||
},
|
},
|
||||||
"users" => { "0" => user },
|
"users" => { "0" => user }
|
||||||
"plugins" => {
|
|
||||||
"intercept" => {
|
|
||||||
"enabled" => true,
|
|
||||||
"queries" => {
|
|
||||||
"0" => {
|
|
||||||
"query" => "select current_database() as a, current_schemas(false) as b",
|
|
||||||
"schema" => [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
],
|
|
||||||
"result" => [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pgcat.update_config(pgcat_cfg)
|
pgcat.update_config(pgcat_cfg)
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ describe "Query Mirroing" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
xit "can mirror a query" do
|
it "can mirror a query" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
runs = 15
|
runs = 15
|
||||||
runs.times { conn.async_exec("SELECT 1 + 2") }
|
runs.times { conn.async_exec("SELECT 1 + 2") }
|
||||||
|
|||||||
@@ -241,18 +241,6 @@ describe "Miscellaneous" do
|
|||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "Resets server roles correctly" do
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.async_exec("SET statement_timeout to 5000")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
context "transaction mode" do
|
context "transaction mode" do
|
||||||
|
|||||||
@@ -1,14 +0,0 @@
|
|||||||
require_relative 'spec_helper'
|
|
||||||
|
|
||||||
|
|
||||||
describe "Plugins" do
|
|
||||||
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
|
||||||
|
|
||||||
context "intercept" do
|
|
||||||
it "will intercept an intellij query" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
res = conn.exec("select current_database() as a, current_schemas(false) as b")
|
|
||||||
expect(res.values).to eq([["sharded_db", "{public}"]])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
Reference in New Issue
Block a user