mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 10:26:30 +00:00
Compare commits
10 Commits
dependabot
...
levkk-star
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee23b374ae | ||
|
|
9dffebccbf | ||
|
|
4c8358b8b3 | ||
|
|
f0d1916a98 | ||
|
|
bba5f10be1 | ||
|
|
a514dbc187 | ||
|
|
d660e3e565 | ||
|
|
0d882cc204 | ||
|
|
b36746a47b | ||
|
|
9e51b8110f |
16
CONFIG.md
16
CONFIG.md
@@ -188,22 +188,6 @@ default: "admin_pass"
|
|||||||
|
|
||||||
Password to access the virtual administrative database
|
Password to access the virtual administrative database
|
||||||
|
|
||||||
### dns_cache_enabled
|
|
||||||
```
|
|
||||||
path: general.dns_cache_enabled
|
|
||||||
default: false
|
|
||||||
```
|
|
||||||
When enabled, ip resolutions for server connections specified using hostnames will be cached
|
|
||||||
and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
|
|
||||||
old ip connections are closed (gracefully) and new connections will start using new ip.
|
|
||||||
|
|
||||||
### dns_max_ttl
|
|
||||||
```
|
|
||||||
path: general.dns_max_ttl
|
|
||||||
default: 30
|
|
||||||
```
|
|
||||||
Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
|
||||||
|
|
||||||
## `pools.<pool_name>` Section
|
## `pools.<pool_name>` Section
|
||||||
|
|
||||||
### pool_mode
|
### pool_mode
|
||||||
|
|||||||
467
Cargo.lock
generated
467
Cargo.lock
generated
@@ -26,27 +26,6 @@ version = "1.6.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream-impl",
|
|
||||||
"futures-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-stream-impl"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.68"
|
version = "0.1.68"
|
||||||
@@ -233,12 +212,6 @@ dependencies = [
|
|||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "data-encoding"
|
|
||||||
version = "2.3.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "digest"
|
name = "digest"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
@@ -250,24 +223,6 @@ dependencies = [
|
|||||||
"subtle",
|
"subtle",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "either"
|
|
||||||
version = "1.8.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "enum-as-inner"
|
|
||||||
version = "0.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116"
|
|
||||||
dependencies = [
|
|
||||||
"heck",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "env_logger"
|
name = "env_logger"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
@@ -320,15 +275,6 @@ version = "1.0.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "form_urlencoded"
|
|
||||||
version = "1.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
|
|
||||||
dependencies = [
|
|
||||||
"percent-encoding",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures"
|
name = "futures"
|
||||||
version = "0.3.28"
|
version = "0.3.28"
|
||||||
@@ -464,12 +410,6 @@ version = "0.12.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "heck"
|
|
||||||
version = "0.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
@@ -494,17 +434,6 @@ dependencies = [
|
|||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hostname"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"match_cfg",
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
@@ -562,7 +491,7 @@ dependencies = [
|
|||||||
"httpdate",
|
"httpdate",
|
||||||
"itoa",
|
"itoa",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2 0.4.9",
|
"socket2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -593,27 +522,6 @@ dependencies = [
|
|||||||
"cxx-build",
|
"cxx-build",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "idna"
|
|
||||||
version = "0.2.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
|
|
||||||
dependencies = [
|
|
||||||
"matches",
|
|
||||||
"unicode-bidi",
|
|
||||||
"unicode-normalization",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "idna"
|
|
||||||
version = "0.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
|
|
||||||
dependencies = [
|
|
||||||
"unicode-bidi",
|
|
||||||
"unicode-normalization",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "1.9.2"
|
version = "1.9.2"
|
||||||
@@ -631,27 +539,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
|
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ipconfig"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "bd302af1b90f2463a98fa5ad469fc212c8e3175a41c3068601bfa2727591c5be"
|
|
||||||
dependencies = [
|
|
||||||
"socket2 0.4.9",
|
|
||||||
"widestring",
|
|
||||||
"winapi",
|
|
||||||
"winreg",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ipnet"
|
|
||||||
version = "2.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "is-terminal"
|
name = "is-terminal"
|
||||||
version = "0.4.4"
|
version = "0.4.4"
|
||||||
@@ -661,16 +551,7 @@ dependencies = [
|
|||||||
"hermit-abi 0.3.1",
|
"hermit-abi 0.3.1",
|
||||||
"io-lifetimes",
|
"io-lifetimes",
|
||||||
"rustix",
|
"rustix",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "itertools"
|
|
||||||
version = "0.10.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
|
|
||||||
dependencies = [
|
|
||||||
"either",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -708,17 +589,11 @@ dependencies = [
|
|||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "lazy_static"
|
|
||||||
version = "1.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.144"
|
version = "0.2.139"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "link-cplusplus"
|
name = "link-cplusplus"
|
||||||
@@ -729,12 +604,6 @@ dependencies = [
|
|||||||
"cc",
|
"cc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "linked-hash-map"
|
|
||||||
version = "0.5.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "linux-raw-sys"
|
name = "linux-raw-sys"
|
||||||
version = "0.1.4"
|
version = "0.1.4"
|
||||||
@@ -760,27 +629,6 @@ dependencies = [
|
|||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "lru-cache"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
|
|
||||||
dependencies = [
|
|
||||||
"linked-hash-map",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "match_cfg"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "matches"
|
|
||||||
version = "0.1.9"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md-5"
|
name = "md-5"
|
||||||
version = "0.10.5"
|
version = "0.10.5"
|
||||||
@@ -814,7 +662,7 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -886,18 +734,12 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"redox_syscall",
|
"redox_syscall",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "percent-encoding"
|
|
||||||
version = "2.2.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha3"
|
version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -912,7 +754,6 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"hmac",
|
"hmac",
|
||||||
"hyper",
|
"hyper",
|
||||||
"itertools",
|
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
@@ -929,17 +770,14 @@ dependencies = [
|
|||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
|
||||||
"sha-1",
|
"sha-1",
|
||||||
"sha2",
|
"sha2",
|
||||||
"socket2 0.5.3",
|
"socket2",
|
||||||
"sqlparser",
|
"sqlparser",
|
||||||
"stringprep",
|
"stringprep",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"tokio-test",
|
|
||||||
"toml",
|
"toml",
|
||||||
"trust-dns-resolver",
|
|
||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1050,12 +888,6 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "quick-error"
|
|
||||||
version = "1.2.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.26"
|
version = "1.0.26"
|
||||||
@@ -1106,9 +938,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.8.1"
|
version = "1.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
|
checksum = "ac6cf59af1067a3fb53fbe5c88c053764e930f932be1d71d3ffe032cbe147f59"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
@@ -1117,19 +949,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-syntax"
|
name = "regex-syntax"
|
||||||
version = "0.7.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "resolv-conf"
|
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00"
|
checksum = "b6868896879ba532248f33598de5181522d8b3d9d724dfd230911e1a7d4822f5"
|
||||||
dependencies = [
|
|
||||||
"hostname",
|
|
||||||
"quick-error",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ring"
|
name = "ring"
|
||||||
@@ -1157,14 +979,14 @@ dependencies = [
|
|||||||
"io-lifetimes",
|
"io-lifetimes",
|
||||||
"libc",
|
"libc",
|
||||||
"linux-raw-sys",
|
"linux-raw-sys",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls"
|
name = "rustls"
|
||||||
version = "0.21.1"
|
version = "0.21.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e"
|
checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"ring",
|
"ring",
|
||||||
@@ -1191,12 +1013,6 @@ dependencies = [
|
|||||||
"untrusted",
|
"untrusted",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ryu"
|
|
||||||
version = "1.0.13"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@@ -1224,9 +1040,6 @@ name = "serde"
|
|||||||
version = "1.0.160"
|
version = "1.0.160"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
||||||
dependencies = [
|
|
||||||
"serde_derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
@@ -1239,17 +1052,6 @@ dependencies = [
|
|||||||
"syn 2.0.9",
|
"syn 2.0.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_json"
|
|
||||||
version = "1.0.96"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
|
|
||||||
dependencies = [
|
|
||||||
"itoa",
|
|
||||||
"ryu",
|
|
||||||
"serde",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_spanned"
|
name = "serde_spanned"
|
||||||
version = "0.6.1"
|
version = "0.6.1"
|
||||||
@@ -1313,24 +1115,14 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.4.9"
|
version = "0.4.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
|
checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "socket2"
|
|
||||||
version = "0.5.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"windows-sys 0.48.0",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "spin"
|
name = "spin"
|
||||||
version = "0.5.2"
|
version = "0.5.2"
|
||||||
@@ -1344,18 +1136,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
|
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "sqlparser_derive"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1411,26 +1191,6 @@ dependencies = [
|
|||||||
"winapi-util",
|
"winapi-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror"
|
|
||||||
version = "1.0.37"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
|
|
||||||
dependencies = [
|
|
||||||
"thiserror-impl",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror-impl"
|
|
||||||
version = "1.0.37"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.1.45"
|
version = "0.1.45"
|
||||||
@@ -1472,9 +1232,9 @@ dependencies = [
|
|||||||
"parking_lot",
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2 0.4.9",
|
"socket2",
|
||||||
"tokio-macros",
|
"tokio-macros",
|
||||||
"windows-sys 0.45.0",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1498,30 +1258,6 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-stream"
|
|
||||||
version = "0.1.11"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
|
|
||||||
dependencies = [
|
|
||||||
"futures-core",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-test"
|
|
||||||
version = "0.4.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
|
|
||||||
dependencies = [
|
|
||||||
"async-stream",
|
|
||||||
"bytes",
|
|
||||||
"futures-core",
|
|
||||||
"tokio",
|
|
||||||
"tokio-stream",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.7.7"
|
version = "0.7.7"
|
||||||
@@ -1584,21 +1320,9 @@ checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-attributes"
|
|
||||||
version = "0.1.23"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-core"
|
name = "tracing-core"
|
||||||
version = "0.1.30"
|
version = "0.1.30"
|
||||||
@@ -1608,51 +1332,6 @@ dependencies = [
|
|||||||
"once_cell",
|
"once_cell",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "trust-dns-proto"
|
|
||||||
version = "0.22.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"cfg-if",
|
|
||||||
"data-encoding",
|
|
||||||
"enum-as-inner",
|
|
||||||
"futures-channel",
|
|
||||||
"futures-io",
|
|
||||||
"futures-util",
|
|
||||||
"idna 0.2.3",
|
|
||||||
"ipnet",
|
|
||||||
"lazy_static",
|
|
||||||
"rand",
|
|
||||||
"smallvec",
|
|
||||||
"thiserror",
|
|
||||||
"tinyvec",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"url",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "trust-dns-resolver"
|
|
||||||
version = "0.22.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"futures-util",
|
|
||||||
"ipconfig",
|
|
||||||
"lazy_static",
|
|
||||||
"lru-cache",
|
|
||||||
"parking_lot",
|
|
||||||
"resolv-conf",
|
|
||||||
"smallvec",
|
|
||||||
"thiserror",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"trust-dns-proto",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "try-lock"
|
name = "try-lock"
|
||||||
version = "0.2.4"
|
version = "0.2.4"
|
||||||
@@ -1698,17 +1377,6 @@ version = "0.7.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "url"
|
|
||||||
version = "2.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
|
|
||||||
dependencies = [
|
|
||||||
"form_urlencoded",
|
|
||||||
"idna 0.3.0",
|
|
||||||
"percent-encoding",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.4"
|
version = "0.9.4"
|
||||||
@@ -1810,12 +1478,6 @@ dependencies = [
|
|||||||
"rustls-webpki",
|
"rustls-webpki",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "widestring"
|
|
||||||
version = "0.5.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "17882f045410753661207383517a6f62ec3dbeb6a4ed2acce01f0728238d1983"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi"
|
name = "winapi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
@@ -1853,16 +1515,7 @@ version = "0.45.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows-targets 0.42.1",
|
"windows-targets",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-sys"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
|
||||||
dependencies = [
|
|
||||||
"windows-targets 0.48.0",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1871,28 +1524,13 @@ version = "0.42.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
|
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows_aarch64_gnullvm 0.42.1",
|
"windows_aarch64_gnullvm",
|
||||||
"windows_aarch64_msvc 0.42.1",
|
"windows_aarch64_msvc",
|
||||||
"windows_i686_gnu 0.42.1",
|
"windows_i686_gnu",
|
||||||
"windows_i686_msvc 0.42.1",
|
"windows_i686_msvc",
|
||||||
"windows_x86_64_gnu 0.42.1",
|
"windows_x86_64_gnu",
|
||||||
"windows_x86_64_gnullvm 0.42.1",
|
"windows_x86_64_gnullvm",
|
||||||
"windows_x86_64_msvc 0.42.1",
|
"windows_x86_64_msvc",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-targets"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
|
|
||||||
dependencies = [
|
|
||||||
"windows_aarch64_gnullvm 0.48.0",
|
|
||||||
"windows_aarch64_msvc 0.48.0",
|
|
||||||
"windows_i686_gnu 0.48.0",
|
|
||||||
"windows_i686_msvc 0.48.0",
|
|
||||||
"windows_x86_64_gnu 0.48.0",
|
|
||||||
"windows_x86_64_gnullvm 0.48.0",
|
|
||||||
"windows_x86_64_msvc 0.48.0",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1901,84 +1539,42 @@ version = "0.42.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
|
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_gnullvm"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_aarch64_msvc"
|
name = "windows_aarch64_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
|
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_i686_gnu"
|
name = "windows_i686_gnu"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
|
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_gnu"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_i686_msvc"
|
name = "windows_i686_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
|
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_gnu"
|
name = "windows_x86_64_gnu"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
|
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnu"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_gnullvm"
|
name = "windows_x86_64_gnullvm"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
|
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnullvm"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows_x86_64_msvc"
|
name = "windows_x86_64_msvc"
|
||||||
version = "0.42.1"
|
version = "0.42.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_msvc"
|
|
||||||
version = "0.48.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winnow"
|
name = "winnow"
|
||||||
version = "0.3.3"
|
version = "0.3.3"
|
||||||
@@ -1987,12 +1583,3 @@ checksum = "faf09497b8f8b5ac5d3bb4d05c0a99be20f26fd3d5f2db7b0716e946d5103658"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "winreg"
|
|
||||||
version = "0.10.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
|
|
||||||
dependencies = [
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|||||||
13
Cargo.toml
13
Cargo.toml
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.0.2-alpha3"
|
version = "1.0.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@@ -14,12 +14,12 @@ rand = "0.8"
|
|||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
sha-1 = "0.10"
|
sha-1 = "0.10"
|
||||||
toml = "0.7"
|
toml = "0.7"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = "1"
|
||||||
serde_derive = "1"
|
serde_derive = "1"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = {version = "0.33", features = ["visitor"] }
|
sqlparser = "0.33.0"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
env_logger = "0.10"
|
env_logger = "0.10"
|
||||||
@@ -34,7 +34,7 @@ hyper = { version = "0.14", features = ["full"] }
|
|||||||
phf = { version = "0.11.1", features = ["macros"] }
|
phf = { version = "0.11.1", features = ["macros"] }
|
||||||
exitcode = "1.1.2"
|
exitcode = "1.1.2"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
socket2 = { version = "0.5.3", features = ["all"] }
|
socket2 = { version = "0.4.7", features = ["all"] }
|
||||||
nix = "0.26.2"
|
nix = "0.26.2"
|
||||||
atomic_enum = "0.2.0"
|
atomic_enum = "0.2.0"
|
||||||
postgres-protocol = "0.6.5"
|
postgres-protocol = "0.6.5"
|
||||||
@@ -42,11 +42,6 @@ fallible-iterator = "0.2"
|
|||||||
pin-project = "1"
|
pin-project = "1"
|
||||||
webpki-roots = "0.23"
|
webpki-roots = "0.23"
|
||||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||||
trust-dns-resolver = "0.22.0"
|
|
||||||
tokio-test = "0.4.2"
|
|
||||||
serde_json = "1"
|
|
||||||
itertools = "0.10"
|
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
jemallocator = "0.5.0"
|
jemallocator = "0.5.0"
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load bal
|
|||||||
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
|
| Failover | **Stable** | Queries are automatically rerouted around broken replicas, validated by regular health checks. |
|
||||||
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
|
| Admin database statistics | **Stable** | Pooler statistics and administration via the `pgbouncer` and `pgcat` databases. |
|
||||||
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
|
| Prometheus statistics | **Stable** | Statistics are reported via a HTTP endpoint for Prometheus. |
|
||||||
| SSL/TLS | **Stable** | Clients can connect to the pooler using TLS. Pooler can connect to Postgres servers using TLS. |
|
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
|
||||||
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
||||||
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
||||||
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
|
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ x-common-env-pg:
|
|||||||
|
|
||||||
services:
|
services:
|
||||||
main:
|
main:
|
||||||
image: gcr.io/google_containers/pause:3.2
|
image: kubernetes/pause
|
||||||
ports:
|
ports:
|
||||||
- 6432
|
- 6432
|
||||||
|
|
||||||
@@ -64,7 +64,7 @@ services:
|
|||||||
<<: *common-env-pg
|
<<: *common-env-pg
|
||||||
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
|
||||||
PGPORT: 10432
|
PGPORT: 10432
|
||||||
command: ["postgres", "-p", "10432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
|
||||||
|
|
||||||
toxiproxy:
|
toxiproxy:
|
||||||
build: .
|
build: .
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
# This is an example of the most basic config
|
|
||||||
# that will mimic what PgBouncer does in transaction mode with one server.
|
|
||||||
|
|
||||||
[general]
|
|
||||||
|
|
||||||
host = "0.0.0.0"
|
|
||||||
port = 6433
|
|
||||||
admin_username = "pgcat"
|
|
||||||
admin_password = "pgcat"
|
|
||||||
|
|
||||||
[pools.pgml.users.0]
|
|
||||||
username = "postgres"
|
|
||||||
password = "postgres"
|
|
||||||
pool_size = 10
|
|
||||||
min_pool_size = 1
|
|
||||||
pool_mode = "transaction"
|
|
||||||
|
|
||||||
[pools.pgml.shards.0]
|
|
||||||
servers = [
|
|
||||||
["127.0.0.1", 28815, "primary"]
|
|
||||||
]
|
|
||||||
database = "postgres"
|
|
||||||
107
pgcat.toml
107
pgcat.toml
@@ -77,58 +77,6 @@ admin_username = "admin_user"
|
|||||||
# Password to access the virtual administrative database
|
# Password to access the virtual administrative database
|
||||||
admin_password = "admin_pass"
|
admin_password = "admin_pass"
|
||||||
|
|
||||||
# Default plugins that are configured on all pools.
|
|
||||||
[plugins]
|
|
||||||
|
|
||||||
# Prewarmer plugin that runs queries on server startup, before giving the connection
|
|
||||||
# to the client.
|
|
||||||
[plugins.prewarmer]
|
|
||||||
enabled = false
|
|
||||||
queries = [
|
|
||||||
"SELECT pg_prewarm('pgbench_accounts')",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Log all queries to stdout.
|
|
||||||
[plugins.query_logger]
|
|
||||||
enabled = false
|
|
||||||
|
|
||||||
# Block access to tables that Postgres does not allow us to control.
|
|
||||||
[plugins.table_access]
|
|
||||||
enabled = false
|
|
||||||
tables = [
|
|
||||||
"pg_user",
|
|
||||||
"pg_roles",
|
|
||||||
"pg_database",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Intercept user queries and give a fake reply.
|
|
||||||
[plugins.intercept]
|
|
||||||
enabled = true
|
|
||||||
|
|
||||||
[plugins.intercept.queries.0]
|
|
||||||
|
|
||||||
query = "select current_database() as a, current_schemas(false) as b"
|
|
||||||
schema = [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
[plugins.intercept.queries.1]
|
|
||||||
|
|
||||||
query = "select current_database(), current_schema(), current_user"
|
|
||||||
schema = [
|
|
||||||
["current_database", "text"],
|
|
||||||
["current_schema", "text"],
|
|
||||||
["current_user", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "public", "${USER}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
# pool configs are structured as pool.<pool_name>
|
# pool configs are structured as pool.<pool_name>
|
||||||
# the pool_name is what clients use as database name when connecting.
|
# the pool_name is what clients use as database name when connecting.
|
||||||
# For a pool named `sharded_db`, clients access that pool using connection string like
|
# For a pool named `sharded_db`, clients access that pool using connection string like
|
||||||
@@ -198,61 +146,6 @@ idle_timeout = 40000
|
|||||||
# Connect timeout can be overwritten in the pool
|
# Connect timeout can be overwritten in the pool
|
||||||
connect_timeout = 3000
|
connect_timeout = 3000
|
||||||
|
|
||||||
# When enabled, ip resolutions for server connections specified using hostnames will be cached
|
|
||||||
# and checked for changes every `dns_max_ttl` seconds. If a change in the host resolution is found
|
|
||||||
# old ip connections are closed (gracefully) and new connections will start using new ip.
|
|
||||||
# dns_cache_enabled = false
|
|
||||||
|
|
||||||
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
|
|
||||||
# dns_max_ttl = 30
|
|
||||||
|
|
||||||
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
|
|
||||||
# so all plugins have to be configured here again.
|
|
||||||
[pool.sharded_db.plugins]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.prewarmer]
|
|
||||||
enabled = true
|
|
||||||
queries = [
|
|
||||||
"SELECT pg_prewarm('pgbench_accounts')",
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.query_logger]
|
|
||||||
enabled = false
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.table_access]
|
|
||||||
enabled = false
|
|
||||||
tables = [
|
|
||||||
"pg_user",
|
|
||||||
"pg_roles",
|
|
||||||
"pg_database",
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept]
|
|
||||||
enabled = true
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept.queries.0]
|
|
||||||
|
|
||||||
query = "select current_database() as a, current_schemas(false) as b"
|
|
||||||
schema = [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
[pools.sharded_db.plugins.intercept.queries.1]
|
|
||||||
|
|
||||||
query = "select current_database(), current_schema(), current_user"
|
|
||||||
schema = [
|
|
||||||
["current_database", "text"],
|
|
||||||
["current_schema", "text"],
|
|
||||||
["current_user", "text"],
|
|
||||||
]
|
|
||||||
result = [
|
|
||||||
["${DATABASE}", "public", "${USER}"],
|
|
||||||
]
|
|
||||||
|
|
||||||
# User configs are structured as pool.<pool_name>.users.<user_index>
|
# User configs are structured as pool.<pool_name>.users.<user_index>
|
||||||
# This section holds the credentials for users that may connect to this cluster
|
# This section holds the credentials for users that may connect to this cluster
|
||||||
[pools.sharded_db.users.0]
|
[pools.sharded_db.users.0]
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ use tokio::time::Instant;
|
|||||||
use crate::config::{get_config, reload_config, VERSION};
|
use crate::config::{get_config, reload_config, VERSION};
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::pool::ClientServerMap;
|
|
||||||
use crate::pool::{get_all_pools, get_pool};
|
use crate::pool::{get_all_pools, get_pool};
|
||||||
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
|
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
|
||||||
|
use crate::ClientServerMap;
|
||||||
|
|
||||||
pub fn generate_server_info_for_admin() -> BytesMut {
|
pub fn generate_server_info_for_admin() -> BytesMut {
|
||||||
let mut server_info = BytesMut::new();
|
let mut server_info = BytesMut::new();
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ use crate::auth_passthrough::refetch_auth_hash;
|
|||||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
|
||||||
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
||||||
use crate::query_router::{Command, QueryRouter};
|
use crate::query_router::{Command, QueryRouter};
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -766,9 +765,6 @@ where
|
|||||||
|
|
||||||
self.stats.register(self.stats.clone());
|
self.stats.register(self.stats.clone());
|
||||||
|
|
||||||
// Result returned by one of the plugins.
|
|
||||||
let mut plugin_output = None;
|
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
// or issue commands for our sharding and server selection protocol.
|
// or issue commands for our sharding and server selection protocol.
|
||||||
@@ -819,25 +815,7 @@ where
|
|||||||
|
|
||||||
'Q' => {
|
'Q' => {
|
||||||
if query_router.query_parser_enabled() {
|
if query_router.query_parser_enabled() {
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
query_router.infer(&message);
|
||||||
let plugin_result = query_router.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
match plugin_result {
|
|
||||||
Ok(PluginOutput::Deny(error)) => {
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -845,13 +823,7 @@ where
|
|||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
|
|
||||||
if query_router.query_parser_enabled() {
|
if query_router.query_parser_enabled() {
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
query_router.infer(&message);
|
||||||
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
||||||
plugin_output = Some(output);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
@@ -885,18 +857,6 @@ where
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check on plugin results.
|
|
||||||
match plugin_output {
|
|
||||||
Some(PluginOutput::Deny(error)) => {
|
|
||||||
self.buffer.clear();
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
plugin_output = None;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Get a pool instance referenced by the most up-to-date
|
// Get a pool instance referenced by the most up-to-date
|
||||||
// pointer. This ensures we always read the latest config
|
// pointer. This ensures we always read the latest config
|
||||||
// when starting a query.
|
// when starting a query.
|
||||||
@@ -1125,27 +1085,6 @@ where
|
|||||||
match code {
|
match code {
|
||||||
// Query
|
// Query
|
||||||
'Q' => {
|
'Q' => {
|
||||||
if query_router.query_parser_enabled() {
|
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
|
||||||
let plugin_result = query_router.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
match plugin_result {
|
|
||||||
Ok(PluginOutput::Deny(error)) => {
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_router.infer(&ast);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
self.send_and_receive_loop(
|
||||||
@@ -1185,14 +1124,6 @@ where
|
|||||||
// Parse
|
// Parse
|
||||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||||
'P' => {
|
'P' => {
|
||||||
if query_router.query_parser_enabled() {
|
|
||||||
if let Ok(ast) = QueryRouter::parse(&message) {
|
|
||||||
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
||||||
plugin_output = Some(output);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1224,24 +1155,6 @@ where
|
|||||||
'S' => {
|
'S' => {
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
match plugin_output {
|
|
||||||
Some(PluginOutput::Deny(error)) => {
|
|
||||||
error_response(&mut self.write, &error).await?;
|
|
||||||
plugin_output = None;
|
|
||||||
self.buffer.clear();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(PluginOutput::Intercept(result)) => {
|
|
||||||
write_all(&mut self.write, result).await?;
|
|
||||||
plugin_output = None;
|
|
||||||
self.buffer.clear();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.buffer.put(&message[..]);
|
self.buffer.put(&message[..]);
|
||||||
|
|
||||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||||
|
|||||||
155
src/config.rs
155
src/config.rs
@@ -12,7 +12,6 @@ use std::sync::Arc;
|
|||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
use crate::dns_cache::CachedResolver;
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
@@ -122,16 +121,6 @@ impl Default for Address {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Address {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"[address: {}:{}][database: {}][user: {}]",
|
|
||||||
self.host, self.port, self.database, self.username
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
// We need to implement PartialEq by ourselves so we skip stats in the comparison
|
||||||
impl PartialEq for Address {
|
impl PartialEq for Address {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
@@ -245,8 +234,6 @@ pub struct General {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
|
|
||||||
pub enable_prometheus_exporter: Option<bool>,
|
pub enable_prometheus_exporter: Option<bool>,
|
||||||
|
|
||||||
#[serde(default = "General::default_prometheus_exporter_port")]
|
|
||||||
pub prometheus_exporter_port: i16,
|
pub prometheus_exporter_port: i16,
|
||||||
|
|
||||||
#[serde(default = "General::default_connect_timeout")]
|
#[serde(default = "General::default_connect_timeout")]
|
||||||
@@ -268,12 +255,6 @@ pub struct General {
|
|||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
pub log_client_disconnections: bool,
|
pub log_client_disconnections: bool,
|
||||||
|
|
||||||
#[serde(default)] // False
|
|
||||||
pub dns_cache_enabled: bool,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_dns_max_ttl")]
|
|
||||||
pub dns_max_ttl: u64,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_shutdown_timeout")]
|
#[serde(default = "General::default_shutdown_timeout")]
|
||||||
pub shutdown_timeout: u64,
|
pub shutdown_timeout: u64,
|
||||||
|
|
||||||
@@ -310,10 +291,6 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
|
||||||
pub validate_config: bool,
|
|
||||||
|
|
||||||
// Support for auth query
|
|
||||||
pub auth_query: Option<String>,
|
pub auth_query: Option<String>,
|
||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
@@ -359,10 +336,6 @@ impl General {
|
|||||||
60000
|
60000
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_dns_max_ttl() -> u64 {
|
|
||||||
30
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_healthcheck_timeout() -> u64 {
|
pub fn default_healthcheck_timeout() -> u64 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
@@ -382,14 +355,6 @@ impl General {
|
|||||||
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
pub fn default_idle_client_in_transaction_timeout() -> u64 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_validate_config() -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_prometheus_exporter_port() -> i16 {
|
|
||||||
9930
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for General {
|
impl Default for General {
|
||||||
@@ -413,8 +378,6 @@ impl Default for General {
|
|||||||
log_client_connections: false,
|
log_client_connections: false,
|
||||||
log_client_disconnections: false,
|
log_client_disconnections: false,
|
||||||
autoreload: None,
|
autoreload: None,
|
||||||
dns_cache_enabled: false,
|
|
||||||
dns_max_ttl: Self::default_dns_max_ttl(),
|
|
||||||
tls_certificate: None,
|
tls_certificate: None,
|
||||||
tls_private_key: None,
|
tls_private_key: None,
|
||||||
server_tls: false,
|
server_tls: false,
|
||||||
@@ -425,7 +388,6 @@ impl Default for General {
|
|||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: 1000 * 3600 * 24, // 24 hours,
|
server_lifetime: 1000 * 3600 * 24, // 24 hours,
|
||||||
validate_config: true,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -478,7 +440,6 @@ pub struct Pool {
|
|||||||
#[serde(default = "Pool::default_load_balancing_mode")]
|
#[serde(default = "Pool::default_load_balancing_mode")]
|
||||||
pub load_balancing_mode: LoadBalancingMode,
|
pub load_balancing_mode: LoadBalancingMode,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_default_role")]
|
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
@@ -493,7 +454,6 @@ pub struct Pool {
|
|||||||
|
|
||||||
pub server_lifetime: Option<u64>,
|
pub server_lifetime: Option<u64>,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_sharding_function")]
|
|
||||||
pub sharding_function: ShardingFunction,
|
pub sharding_function: ShardingFunction,
|
||||||
|
|
||||||
#[serde(default = "Pool::default_automatic_sharding_key")]
|
#[serde(default = "Pool::default_automatic_sharding_key")]
|
||||||
@@ -507,7 +467,6 @@ pub struct Pool {
|
|||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
pub shards: BTreeMap<String, Shard>,
|
pub shards: BTreeMap<String, Shard>,
|
||||||
pub users: BTreeMap<String, User>,
|
pub users: BTreeMap<String, User>,
|
||||||
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
||||||
@@ -540,14 +499,6 @@ impl Pool {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_default_role() -> String {
|
|
||||||
"any".into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_sharding_function() -> ShardingFunction {
|
|
||||||
ShardingFunction::PgBigintHash
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn validate(&mut self) -> Result<(), Error> {
|
pub fn validate(&mut self) -> Result<(), Error> {
|
||||||
match self.default_role.as_ref() {
|
match self.default_role.as_ref() {
|
||||||
"any" => (),
|
"any" => (),
|
||||||
@@ -636,7 +587,6 @@ impl Default for Pool {
|
|||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
server_lifetime: None,
|
server_lifetime: None,
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -715,76 +665,6 @@ impl Default for Shard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Plugins {
|
|
||||||
pub intercept: Option<Intercept>,
|
|
||||||
pub table_access: Option<TableAccess>,
|
|
||||||
pub query_logger: Option<QueryLogger>,
|
|
||||||
pub prewarmer: Option<Prewarmer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Plugins {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
|
|
||||||
self.intercept.is_some(),
|
|
||||||
self.table_access.is_some(),
|
|
||||||
self.query_logger.is_some(),
|
|
||||||
self.prewarmer.is_some(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Intercept {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub queries: BTreeMap<String, Query>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct TableAccess {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub tables: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct QueryLogger {
|
|
||||||
pub enabled: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Prewarmer {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub queries: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Intercept {
|
|
||||||
pub fn substitute(&mut self, db: &str, user: &str) {
|
|
||||||
for (_, query) in self.queries.iter_mut() {
|
|
||||||
query.substitute(db, user);
|
|
||||||
query.query = query.query.to_ascii_lowercase();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
|
|
||||||
pub struct Query {
|
|
||||||
pub query: String,
|
|
||||||
pub schema: Vec<Vec<String>>,
|
|
||||||
pub result: Vec<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Query {
|
|
||||||
pub fn substitute(&mut self, db: &str, user: &str) {
|
|
||||||
for col in self.result.iter_mut() {
|
|
||||||
for i in 0..col.len() {
|
|
||||||
col[i] = col[i].replace("${USER}", user).replace("${DATABASE}", db);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Configuration wrapper.
|
/// Configuration wrapper.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -802,13 +682,7 @@ pub struct Config {
|
|||||||
#[serde(default = "Config::default_path")]
|
#[serde(default = "Config::default_path")]
|
||||||
pub path: String,
|
pub path: String,
|
||||||
|
|
||||||
// General and global settings.
|
|
||||||
pub general: General,
|
pub general: General,
|
||||||
|
|
||||||
// Plugins that should run in all pools.
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
|
|
||||||
// Connection pools.
|
|
||||||
pub pools: HashMap<String, Pool>,
|
pub pools: HashMap<String, Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -846,7 +720,6 @@ impl Default for Config {
|
|||||||
path: Self::default_path(),
|
path: Self::default_path(),
|
||||||
general: General::default(),
|
general: General::default(),
|
||||||
pools: HashMap::default(),
|
pools: HashMap::default(),
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -993,13 +866,6 @@ impl Config {
|
|||||||
"Server TLS certificate verification: {}",
|
"Server TLS certificate verification: {}",
|
||||||
self.general.verify_server_certificate
|
self.general.verify_server_certificate
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"Plugins: {}",
|
|
||||||
match self.plugins {
|
|
||||||
Some(ref plugins) => plugins.to_string(),
|
|
||||||
None => "not configured".into(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
for (pool_name, pool_config) in &self.pools {
|
for (pool_name, pool_config) in &self.pools {
|
||||||
// TODO: Make this output prettier (maybe a table?)
|
// TODO: Make this output prettier (maybe a table?)
|
||||||
@@ -1066,14 +932,6 @@ impl Config {
|
|||||||
None => "default".to_string(),
|
None => "default".to_string(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
info!(
|
|
||||||
"[pool: {}] Plugins: {}",
|
|
||||||
pool_name,
|
|
||||||
match pool_config.plugins {
|
|
||||||
Some(ref plugins) => plugins.to_string(),
|
|
||||||
None => "not configured".into(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
for user in &pool_config.users {
|
for user in &pool_config.users {
|
||||||
info!(
|
info!(
|
||||||
@@ -1253,7 +1111,6 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
|
|
||||||
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, Error> {
|
||||||
let old_config = get_config();
|
let old_config = get_config();
|
||||||
|
|
||||||
match parse(&old_config.path).await {
|
match parse(&old_config.path).await {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -1261,18 +1118,14 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
|
|||||||
return Err(Error::BadConfig);
|
return Err(Error::BadConfig);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_config = get_config();
|
let new_config = get_config();
|
||||||
|
|
||||||
match CachedResolver::from_config().await {
|
if old_config.pools != new_config.pools {
|
||||||
Ok(_) => (),
|
info!("Pool configuration changed");
|
||||||
Err(err) => error!("DNS cache reinitialization error: {:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
if old_config != new_config {
|
|
||||||
info!("Config changed, reloading");
|
|
||||||
ConnectionPool::from_config(client_server_map).await?;
|
ConnectionPool::from_config(client_server_map).await?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
|
} else if old_config != new_config {
|
||||||
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|||||||
410
src/dns_cache.rs
410
src/dns_cache.rs
@@ -1,410 +0,0 @@
|
|||||||
use crate::config::get_config;
|
|
||||||
use crate::errors::Error;
|
|
||||||
use arc_swap::ArcSwap;
|
|
||||||
use log::{debug, error, info, warn};
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
use std::io;
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::RwLock;
|
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
use trust_dns_resolver::error::{ResolveError, ResolveResult};
|
|
||||||
use trust_dns_resolver::lookup_ip::LookupIp;
|
|
||||||
use trust_dns_resolver::TokioAsyncResolver;
|
|
||||||
|
|
||||||
/// Cached Resolver Globally available
|
|
||||||
pub static CACHED_RESOLVER: Lazy<ArcSwap<CachedResolver>> =
|
|
||||||
Lazy::new(|| ArcSwap::from_pointee(CachedResolver::default()));
|
|
||||||
|
|
||||||
// Ip addressed are returned as a set of addresses
|
|
||||||
// so we can compare.
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
|
||||||
pub struct AddrSet {
|
|
||||||
set: HashSet<IpAddr>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AddrSet {
|
|
||||||
fn new() -> AddrSet {
|
|
||||||
AddrSet {
|
|
||||||
set: HashSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<LookupIp> for AddrSet {
|
|
||||||
fn from(lookup_ip: LookupIp) -> Self {
|
|
||||||
let mut addr_set = AddrSet::new();
|
|
||||||
for address in lookup_ip.iter() {
|
|
||||||
addr_set.set.insert(address);
|
|
||||||
}
|
|
||||||
addr_set
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// A CachedResolver is a DNS resolution cache mechanism with customizable expiration time.
|
|
||||||
///
|
|
||||||
/// The system works as follows:
|
|
||||||
///
|
|
||||||
/// When a host is to be resolved, if we have not resolved it before, a new resolution is
|
|
||||||
/// executed and stored in the internal cache. Concurrently, every `dns_max_ttl` time, the
|
|
||||||
/// cache is refreshed.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// let addrset = resolver.lookup_ip("www.example.com.").await.unwrap();
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// // Now the ip resolution is stored in local cache and subsequent
|
|
||||||
/// // calls will be returned from cache. Also, the cache is refreshed
|
|
||||||
/// // and updated every 10 seconds.
|
|
||||||
///
|
|
||||||
/// // You can now check if an 'old' lookup differs from what it's currently
|
|
||||||
/// // store in cache by using `has_changed`.
|
|
||||||
/// resolver.has_changed("www.example.com.", addrset)
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct CachedResolver {
|
|
||||||
// The configuration of the cached_resolver.
|
|
||||||
config: CachedResolverConfig,
|
|
||||||
|
|
||||||
// This is the hash that contains the hash.
|
|
||||||
data: Option<RwLock<HashMap<String, AddrSet>>>,
|
|
||||||
|
|
||||||
// The resolver to be used for DNS queries.
|
|
||||||
resolver: Option<TokioAsyncResolver>,
|
|
||||||
|
|
||||||
// The RefreshLoop
|
|
||||||
refresh_loop: RwLock<Option<tokio::task::JoinHandle<()>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// Configuration
|
|
||||||
#[derive(Clone, Debug, Default, PartialEq)]
|
|
||||||
pub struct CachedResolverConfig {
|
|
||||||
/// Amount of time in secods that a resolved dns address is considered stale.
|
|
||||||
dns_max_ttl: u64,
|
|
||||||
|
|
||||||
/// Enabled or disabled? (this is so we can reload config)
|
|
||||||
enabled: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedResolverConfig {
|
|
||||||
fn new(dns_max_ttl: u64, enabled: bool) -> Self {
|
|
||||||
CachedResolverConfig {
|
|
||||||
dns_max_ttl,
|
|
||||||
enabled,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<crate::config::Config> for CachedResolverConfig {
|
|
||||||
fn from(config: crate::config::Config) -> Self {
|
|
||||||
CachedResolverConfig::new(config.general.dns_max_ttl, config.general.dns_cache_enabled)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedResolver {
|
|
||||||
///
|
|
||||||
/// Returns a new Arc<CachedResolver> based on passed configuration.
|
|
||||||
/// It also starts the loop that will refresh cache entries.
|
|
||||||
///
|
|
||||||
/// # Arguments:
|
|
||||||
///
|
|
||||||
/// * `config` - The `CachedResolverConfig` to be used to create the resolver.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
pub async fn new(
|
|
||||||
config: CachedResolverConfig,
|
|
||||||
data: Option<HashMap<String, AddrSet>>,
|
|
||||||
) -> Result<Arc<Self>, io::Error> {
|
|
||||||
// Construct a new Resolver with default configuration options
|
|
||||||
let resolver = Some(TokioAsyncResolver::tokio_from_system_conf()?);
|
|
||||||
|
|
||||||
let data = if let Some(hash) = data {
|
|
||||||
Some(RwLock::new(hash))
|
|
||||||
} else {
|
|
||||||
Some(RwLock::new(HashMap::new()))
|
|
||||||
};
|
|
||||||
|
|
||||||
let instance = Arc::new(Self {
|
|
||||||
config,
|
|
||||||
resolver,
|
|
||||||
data,
|
|
||||||
refresh_loop: RwLock::new(None),
|
|
||||||
});
|
|
||||||
|
|
||||||
if instance.enabled() {
|
|
||||||
info!("Scheduling DNS refresh loop");
|
|
||||||
let refresh_loop = tokio::task::spawn({
|
|
||||||
let instance = instance.clone();
|
|
||||||
async move {
|
|
||||||
instance.refresh_dns_entries_loop().await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
*(instance.refresh_loop.write().unwrap()) = Some(refresh_loop);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(instance)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn enabled(&self) -> bool {
|
|
||||||
self.config.enabled
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedules the refresher
|
|
||||||
async fn refresh_dns_entries_loop(&self) {
|
|
||||||
let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap();
|
|
||||||
let interval = Duration::from_secs(self.config.dns_max_ttl);
|
|
||||||
loop {
|
|
||||||
debug!("Begin refreshing cached DNS addresses.");
|
|
||||||
// To minimize the time we hold the lock, we first create
|
|
||||||
// an array with keys.
|
|
||||||
let mut hostnames: Vec<String> = Vec::new();
|
|
||||||
{
|
|
||||||
if let Some(ref data) = self.data {
|
|
||||||
for hostname in data.read().unwrap().keys() {
|
|
||||||
hostnames.push(hostname.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for hostname in hostnames.iter() {
|
|
||||||
let addrset = self
|
|
||||||
.fetch_from_cache(hostname.as_str())
|
|
||||||
.expect("Could not obtain expected address from cache, this should not happen");
|
|
||||||
|
|
||||||
match resolver.lookup_ip(hostname).await {
|
|
||||||
Ok(lookup_ip) => {
|
|
||||||
let new_addrset = AddrSet::from(lookup_ip);
|
|
||||||
debug!(
|
|
||||||
"Obtained address for host ({}) -> ({:?})",
|
|
||||||
hostname, new_addrset
|
|
||||||
);
|
|
||||||
|
|
||||||
if addrset != new_addrset {
|
|
||||||
debug!(
|
|
||||||
"Addr changed from {:?} to {:?} updating cache.",
|
|
||||||
addrset, new_addrset
|
|
||||||
);
|
|
||||||
self.store_in_cache(hostname, new_addrset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
"There was an error trying to resolv {}: ({}).",
|
|
||||||
hostname, err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug!("Finished refreshing cached DNS addresses.");
|
|
||||||
sleep(interval).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a `AddrSet` given the specified hostname.
|
|
||||||
///
|
|
||||||
/// This method first tries to fetch the value from the cache, if it misses
|
|
||||||
/// then it is resolved and stored in the cache. TTL from records is ignored.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `host` - A string slice referencing the hostname to be resolved.
|
|
||||||
///
|
|
||||||
/// # Example:
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pgcat::dns_cache::{CachedResolverConfig, CachedResolver};
|
|
||||||
///
|
|
||||||
/// # tokio_test::block_on(async {
|
|
||||||
/// let config = CachedResolverConfig::default();
|
|
||||||
/// let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
/// let response = resolver.lookup_ip("www.google.com.");
|
|
||||||
/// # })
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
pub async fn lookup_ip(&self, host: &str) -> ResolveResult<AddrSet> {
|
|
||||||
debug!("Lookup up {} in cache", host);
|
|
||||||
match self.fetch_from_cache(host) {
|
|
||||||
Some(addr_set) => {
|
|
||||||
debug!("Cache hit!");
|
|
||||||
Ok(addr_set)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
debug!("Not found, executing a dns query!");
|
|
||||||
if let Some(ref resolver) = self.resolver {
|
|
||||||
let addr_set = AddrSet::from(resolver.lookup_ip(host).await?);
|
|
||||||
debug!("Obtained: {:?}", addr_set);
|
|
||||||
self.store_in_cache(host, addr_set.clone());
|
|
||||||
Ok(addr_set)
|
|
||||||
} else {
|
|
||||||
Err(ResolveError::from("No resolver available"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Returns true if the stored host resolution differs from the AddrSet passed.
|
|
||||||
pub fn has_changed(&self, host: &str, addr_set: &AddrSet) -> bool {
|
|
||||||
if let Some(fetched_addr_set) = self.fetch_from_cache(host) {
|
|
||||||
return fetched_addr_set != *addr_set;
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetches an AddrSet from the inner cache adquiring the read lock.
|
|
||||||
fn fetch_from_cache(&self, key: &str) -> Option<AddrSet> {
|
|
||||||
if let Some(ref hash) = self.data {
|
|
||||||
if let Some(addr_set) = hash.read().unwrap().get(key) {
|
|
||||||
return Some(addr_set.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets up the global CACHED_RESOLVER static variable so we can globally use DNS
|
|
||||||
// cache.
|
|
||||||
pub async fn from_config() -> Result<(), Error> {
|
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
let desired_config = CachedResolverConfig::from(get_config());
|
|
||||||
|
|
||||||
if cached_resolver.config != desired_config {
|
|
||||||
if let Some(ref refresh_loop) = *(cached_resolver.refresh_loop.write().unwrap()) {
|
|
||||||
warn!("Killing Dnscache refresh loop as its configuration is being reloaded");
|
|
||||||
refresh_loop.abort()
|
|
||||||
}
|
|
||||||
let new_resolver = if let Some(ref data) = cached_resolver.data {
|
|
||||||
let data = Some(data.read().unwrap().clone());
|
|
||||||
CachedResolver::new(desired_config, data).await
|
|
||||||
} else {
|
|
||||||
CachedResolver::new(desired_config, None).await
|
|
||||||
};
|
|
||||||
|
|
||||||
match new_resolver {
|
|
||||||
Ok(ok) => {
|
|
||||||
CACHED_RESOLVER.store(ok);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let message = format!("Error setting up cached_resolver. Error: {:?}, will continue without this feature.", err);
|
|
||||||
Err(Error::DNSCachedError(message))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stores the AddrSet in cache adquiring the write lock.
|
|
||||||
fn store_in_cache(&self, host: &str, addr_set: AddrSet) {
|
|
||||||
if let Some(ref data) = self.data {
|
|
||||||
data.write().unwrap().insert(host.to_string(), addr_set);
|
|
||||||
} else {
|
|
||||||
error!("Could not insert, Hash not initialized");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use trust_dns_resolver::error::ResolveError;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn new() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await;
|
|
||||||
assert!(resolver.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn lookup_ip() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let response = resolver.lookup_ip("www.google.com.").await;
|
|
||||||
assert!(response.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn has_changed() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.google.com.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
let addr_set = response.unwrap();
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn unknown_host() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.idontexists.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
assert!(matches!(response, Err(ResolveError { .. })));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn incorrect_address() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "w ww.idontexists.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
assert!(matches!(response, Err(ResolveError { .. })));
|
|
||||||
assert!(!resolver.has_changed(hostname, &AddrSet::new()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
// Ok, this test is based on the fact that google does DNS RR
|
|
||||||
// and does not responds with every available ip everytime, so
|
|
||||||
// if I cache here, it will miss after one cache iteration or two.
|
|
||||||
async fn thread() {
|
|
||||||
let config = CachedResolverConfig {
|
|
||||||
dns_max_ttl: 10,
|
|
||||||
enabled: true,
|
|
||||||
};
|
|
||||||
let resolver = CachedResolver::new(config, None).await.unwrap();
|
|
||||||
let hostname = "www.google.com.";
|
|
||||||
let response = resolver.lookup_ip(hostname).await;
|
|
||||||
let addr_set = response.unwrap();
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
let resolver_for_refresher = resolver.clone();
|
|
||||||
let _thread_handle = tokio::task::spawn(async move {
|
|
||||||
resolver_for_refresher.refresh_dns_entries_loop().await;
|
|
||||||
});
|
|
||||||
assert!(!resolver.has_changed(hostname, &addr_set));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
//! Errors.
|
//! Errors.
|
||||||
|
|
||||||
/// Various errors.
|
/// Various errors.
|
||||||
#[derive(Debug, PartialEq, Clone)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
SocketError(String),
|
SocketError(String),
|
||||||
ClientSocketError(String, ClientIdentifier),
|
ClientSocketError(String, ClientIdentifier),
|
||||||
@@ -19,13 +19,10 @@ pub enum Error {
|
|||||||
ClientError(String),
|
ClientError(String),
|
||||||
TlsError,
|
TlsError,
|
||||||
StatementTimeout,
|
StatementTimeout,
|
||||||
DNSCachedError(String),
|
|
||||||
ShuttingDown,
|
ShuttingDown,
|
||||||
ParseBytesError(String),
|
ParseBytesError(String),
|
||||||
AuthError(String),
|
AuthError(String),
|
||||||
AuthPassthroughError(String),
|
AuthPassthroughError(String),
|
||||||
UnsupportedStatement,
|
|
||||||
QueryRouterParserError(String),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
|||||||
@@ -1,17 +1,11 @@
|
|||||||
pub mod admin;
|
|
||||||
pub mod auth_passthrough;
|
pub mod auth_passthrough;
|
||||||
pub mod client;
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod constants;
|
pub mod constants;
|
||||||
pub mod dns_cache;
|
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
pub mod messages;
|
pub mod messages;
|
||||||
pub mod mirrors;
|
pub mod mirrors;
|
||||||
pub mod multi_logger;
|
pub mod multi_logger;
|
||||||
pub mod plugins;
|
|
||||||
pub mod pool;
|
pub mod pool;
|
||||||
pub mod prometheus;
|
|
||||||
pub mod query_router;
|
|
||||||
pub mod scram;
|
pub mod scram;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod sharding;
|
pub mod sharding;
|
||||||
|
|||||||
46
src/main.rs
46
src/main.rs
@@ -36,7 +36,6 @@ extern crate sqlparser;
|
|||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
extern crate tokio_rustls;
|
extern crate tokio_rustls;
|
||||||
extern crate toml;
|
extern crate toml;
|
||||||
extern crate trust_dns_resolver;
|
|
||||||
|
|
||||||
#[cfg(not(target_env = "msvc"))]
|
#[cfg(not(target_env = "msvc"))]
|
||||||
use jemallocator::Jemalloc;
|
use jemallocator::Jemalloc;
|
||||||
@@ -61,19 +60,36 @@ use std::str::FromStr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use pgcat::config::{get_config, reload_config, VERSION};
|
mod admin;
|
||||||
use pgcat::dns_cache;
|
mod auth_passthrough;
|
||||||
use pgcat::messages::configure_socket;
|
mod client;
|
||||||
use pgcat::pool::{ClientServerMap, ConnectionPool};
|
mod config;
|
||||||
use pgcat::prometheus::start_metric_server;
|
mod constants;
|
||||||
use pgcat::stats::{Collector, Reporter, REPORTER};
|
mod errors;
|
||||||
|
mod messages;
|
||||||
|
mod mirrors;
|
||||||
|
mod multi_logger;
|
||||||
|
mod pool;
|
||||||
|
mod prometheus;
|
||||||
|
mod query_router;
|
||||||
|
mod scram;
|
||||||
|
mod server;
|
||||||
|
mod sharding;
|
||||||
|
mod stats;
|
||||||
|
mod tls;
|
||||||
|
|
||||||
|
use crate::config::{get_config, reload_config, VERSION};
|
||||||
|
use crate::messages::configure_socket;
|
||||||
|
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||||
|
use crate::prometheus::start_metric_server;
|
||||||
|
use crate::stats::{Collector, Reporter, REPORTER};
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
pgcat::multi_logger::MultiLogger::init().unwrap();
|
multi_logger::MultiLogger::init().unwrap();
|
||||||
|
|
||||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||||
|
|
||||||
if !pgcat::query_router::QueryRouter::setup() {
|
if !query_router::QueryRouter::setup() {
|
||||||
error!("Could not setup query router");
|
error!("Could not setup query router");
|
||||||
std::process::exit(exitcode::CONFIG);
|
std::process::exit(exitcode::CONFIG);
|
||||||
}
|
}
|
||||||
@@ -91,7 +107,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
|
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
|
||||||
|
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
match pgcat::config::parse(&config_file).await {
|
match config::parse(&config_file).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Config parse error: {:?}", err);
|
error!("Config parse error: {:?}", err);
|
||||||
@@ -150,12 +166,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
// Statistics reporting.
|
// Statistics reporting.
|
||||||
REPORTER.store(Arc::new(Reporter::default()));
|
REPORTER.store(Arc::new(Reporter::default()));
|
||||||
|
|
||||||
// Starts (if enabled) dns cache before pools initialization
|
|
||||||
match dns_cache::CachedResolver::from_config().await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => error!("DNS cache initialization error: {:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Connection pool that allows to query all shards and replicas.
|
// Connection pool that allows to query all shards and replicas.
|
||||||
match ConnectionPool::from_config(client_server_map.clone()).await {
|
match ConnectionPool::from_config(client_server_map.clone()).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
@@ -285,7 +295,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let start = chrono::offset::Utc::now().naive_utc();
|
let start = chrono::offset::Utc::now().naive_utc();
|
||||||
|
|
||||||
match pgcat::client::client_entrypoint(
|
match client::client_entrypoint(
|
||||||
socket,
|
socket,
|
||||||
client_server_map,
|
client_server_map,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
@@ -316,7 +326,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
match err {
|
match err {
|
||||||
pgcat::errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
|
errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
|
||||||
_ => warn!("Client disconnected with error {:?}", err),
|
_ => warn!("Client disconnected with error {:?}", err),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,10 +20,6 @@ pub enum DataType {
|
|||||||
Text,
|
Text,
|
||||||
Int4,
|
Int4,
|
||||||
Numeric,
|
Numeric,
|
||||||
Bool,
|
|
||||||
Oid,
|
|
||||||
AnyArray,
|
|
||||||
Any,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&DataType> for i32 {
|
impl From<&DataType> for i32 {
|
||||||
@@ -32,10 +28,6 @@ impl From<&DataType> for i32 {
|
|||||||
DataType::Text => 25,
|
DataType::Text => 25,
|
||||||
DataType::Int4 => 23,
|
DataType::Int4 => 23,
|
||||||
DataType::Numeric => 1700,
|
DataType::Numeric => 1700,
|
||||||
DataType::Bool => 16,
|
|
||||||
DataType::Oid => 26,
|
|
||||||
DataType::AnyArray => 2277,
|
|
||||||
DataType::Any => 2276,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -451,10 +443,6 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
|||||||
DataType::Text => -1,
|
DataType::Text => -1,
|
||||||
DataType::Int4 => 4,
|
DataType::Int4 => 4,
|
||||||
DataType::Numeric => -1,
|
DataType::Numeric => -1,
|
||||||
DataType::Bool => 1,
|
|
||||||
DataType::Oid => 4,
|
|
||||||
DataType::AnyArray => -1,
|
|
||||||
DataType::Any => -1,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
row_desc.put_i16(type_size);
|
row_desc.put_i16(type_size);
|
||||||
@@ -493,29 +481,6 @@ pub fn data_row(row: &Vec<String>) -> BytesMut {
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
|
|
||||||
let mut res = BytesMut::new();
|
|
||||||
let mut data_row = BytesMut::new();
|
|
||||||
|
|
||||||
data_row.put_i16(row.len() as i16);
|
|
||||||
|
|
||||||
for column in row {
|
|
||||||
if let Some(column) = column {
|
|
||||||
let column = column.as_bytes();
|
|
||||||
data_row.put_i32(column.len() as i32);
|
|
||||||
data_row.put_slice(column);
|
|
||||||
} else {
|
|
||||||
data_row.put_i32(-1 as i32);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.put_u8(b'D');
|
|
||||||
res.put_i32(data_row.len() as i32 + 4);
|
|
||||||
res.put(data_row);
|
|
||||||
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a CommandComplete message.
|
/// Create a CommandComplete message.
|
||||||
pub fn command_complete(command: &str) -> BytesMut {
|
pub fn command_complete(command: &str) -> BytesMut {
|
||||||
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ impl MirroredClient {
|
|||||||
ClientServerMap::default(),
|
ClientServerMap::default(),
|
||||||
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
Arc::new(PoolStats::new(identifier, cfg.clone())),
|
||||||
Arc::new(RwLock::new(None)),
|
Arc::new(RwLock::new(None)),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
Pool::builder()
|
Pool::builder()
|
||||||
|
|||||||
@@ -1,120 +0,0 @@
|
|||||||
//! The intercept plugin.
|
|
||||||
//!
|
|
||||||
//! It intercepts queries and returns fake results.
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::{BufMut, BytesMut};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
config::Intercept as InterceptConfig,
|
|
||||||
errors::Error,
|
|
||||||
messages::{command_complete, data_row_nullable, row_description, DataType},
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: use these structs for deserialization
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct Rule {
|
|
||||||
query: String,
|
|
||||||
schema: Vec<Column>,
|
|
||||||
result: Vec<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct Column {
|
|
||||||
name: String,
|
|
||||||
data_type: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The intercept plugin.
|
|
||||||
pub struct Intercept<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub config: &'a InterceptConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for Intercept<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled || ast.is_empty() {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut config = self.config.clone();
|
|
||||||
config.substitute(
|
|
||||||
&query_router.pool_settings().db,
|
|
||||||
&query_router.pool_settings().user.username,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut result = BytesMut::new();
|
|
||||||
|
|
||||||
for q in ast {
|
|
||||||
// Normalization
|
|
||||||
let q = q.to_string().to_ascii_lowercase();
|
|
||||||
|
|
||||||
for (_, target) in config.queries.iter() {
|
|
||||||
if target.query.as_str() == q {
|
|
||||||
debug!("Intercepting query: {}", q);
|
|
||||||
|
|
||||||
let rd = target
|
|
||||||
.schema
|
|
||||||
.iter()
|
|
||||||
.map(|row| {
|
|
||||||
let name = &row[0];
|
|
||||||
let data_type = &row[1];
|
|
||||||
(
|
|
||||||
name.as_str(),
|
|
||||||
match data_type.as_str() {
|
|
||||||
"text" => DataType::Text,
|
|
||||||
"anyarray" => DataType::AnyArray,
|
|
||||||
"oid" => DataType::Oid,
|
|
||||||
"bool" => DataType::Bool,
|
|
||||||
"int4" => DataType::Int4,
|
|
||||||
_ => DataType::Any,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<(&str, DataType)>>();
|
|
||||||
|
|
||||||
result.put(row_description(&rd));
|
|
||||||
|
|
||||||
target.result.iter().for_each(|row| {
|
|
||||||
let row = row
|
|
||||||
.iter()
|
|
||||||
.map(|s| {
|
|
||||||
let s = s.as_str().to_string();
|
|
||||||
|
|
||||||
if s == "" {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(s)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<Option<String>>>();
|
|
||||||
result.put(data_row_nullable(&row));
|
|
||||||
});
|
|
||||||
|
|
||||||
result.put(command_complete("SELECT"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !result.is_empty() {
|
|
||||||
result.put_u8(b'Z');
|
|
||||||
result.put_i32(5);
|
|
||||||
result.put_u8(b'I');
|
|
||||||
|
|
||||||
return Ok(PluginOutput::Intercept(result));
|
|
||||||
} else {
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
//! The plugin ecosystem.
|
|
||||||
//!
|
|
||||||
//! Currently plugins only grant access or deny access to the database for a particual query.
|
|
||||||
//! Example use cases:
|
|
||||||
//! - block known bad queries
|
|
||||||
//! - block access to system catalogs
|
|
||||||
//! - block dangerous modifications like `DROP TABLE`
|
|
||||||
//! - etc
|
|
||||||
//!
|
|
||||||
|
|
||||||
pub mod intercept;
|
|
||||||
pub mod prewarmer;
|
|
||||||
pub mod query_logger;
|
|
||||||
pub mod table_access;
|
|
||||||
|
|
||||||
use crate::{errors::Error, query_router::QueryRouter};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::BytesMut;
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
pub use intercept::Intercept;
|
|
||||||
pub use query_logger::QueryLogger;
|
|
||||||
pub use table_access::TableAccess;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub enum PluginOutput {
|
|
||||||
Allow,
|
|
||||||
Deny(String),
|
|
||||||
Overwrite(Vec<Statement>),
|
|
||||||
Intercept(BytesMut),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Plugin {
|
|
||||||
// Run before the query is sent to the server.
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error>;
|
|
||||||
|
|
||||||
// TODO: run after the result is returned
|
|
||||||
// async fn callback(&mut self, query_router: &QueryRouter);
|
|
||||||
}
|
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
//! Prewarm new connections before giving them to the client.
|
|
||||||
use crate::{errors::Error, server::Server};
|
|
||||||
use log::info;
|
|
||||||
|
|
||||||
pub struct Prewarmer<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub server: &'a mut Server,
|
|
||||||
pub queries: &'a Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Prewarmer<'a> {
|
|
||||||
pub async fn run(&mut self) -> Result<(), Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
for query in self.queries {
|
|
||||||
info!(
|
|
||||||
"{} Prewarning with query: `{}`",
|
|
||||||
self.server.address(),
|
|
||||||
query
|
|
||||||
);
|
|
||||||
self.server.query(&query).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
//! Log all queries to stdout (or somewhere else, why not).
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
errors::Error,
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use log::info;
|
|
||||||
use sqlparser::ast::Statement;
|
|
||||||
|
|
||||||
pub struct QueryLogger<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub user: &'a str,
|
|
||||||
pub db: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for QueryLogger<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
_query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let query = ast
|
|
||||||
.iter()
|
|
||||||
.map(|q| q.to_string())
|
|
||||||
.collect::<Vec<String>>()
|
|
||||||
.join("; ");
|
|
||||||
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
|
|
||||||
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,59 +0,0 @@
|
|||||||
//! This query router plugin will check if the user can access a particular
|
|
||||||
//! table as part of their query. If they can't, the query will not be routed.
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use sqlparser::ast::{visit_relations, Statement};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
errors::Error,
|
|
||||||
plugins::{Plugin, PluginOutput},
|
|
||||||
query_router::QueryRouter,
|
|
||||||
};
|
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
|
|
||||||
use core::ops::ControlFlow;
|
|
||||||
|
|
||||||
pub struct TableAccess<'a> {
|
|
||||||
pub enabled: bool,
|
|
||||||
pub tables: &'a Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> Plugin for TableAccess<'a> {
|
|
||||||
async fn run(
|
|
||||||
&mut self,
|
|
||||||
_query_router: &QueryRouter,
|
|
||||||
ast: &Vec<Statement>,
|
|
||||||
) -> Result<PluginOutput, Error> {
|
|
||||||
if !self.enabled {
|
|
||||||
return Ok(PluginOutput::Allow);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut found = None;
|
|
||||||
|
|
||||||
visit_relations(ast, |relation| {
|
|
||||||
let relation = relation.to_string();
|
|
||||||
let parts = relation.split(".").collect::<Vec<&str>>();
|
|
||||||
let table_name = parts.last().unwrap();
|
|
||||||
|
|
||||||
if self.tables.contains(&table_name.to_string()) {
|
|
||||||
found = Some(table_name.to_string());
|
|
||||||
ControlFlow::<()>::Break(())
|
|
||||||
} else {
|
|
||||||
ControlFlow::<()>::Continue(())
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(found) = found {
|
|
||||||
debug!("Blocking access to table \"{}\"", found);
|
|
||||||
|
|
||||||
Ok(PluginOutput::Deny(format!(
|
|
||||||
"permission for table \"{}\" denied",
|
|
||||||
found
|
|
||||||
)))
|
|
||||||
} else {
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
85
src/pool.rs
85
src/pool.rs
@@ -17,13 +17,10 @@ use std::sync::{
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::config::{
|
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
|
||||||
get_config, Address, General, LoadBalancingMode, Plugins, PoolMode, Role, User,
|
|
||||||
};
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
|
|
||||||
use crate::auth_passthrough::AuthPassthrough;
|
use crate::auth_passthrough::AuthPassthrough;
|
||||||
use crate::plugins::prewarmer;
|
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
|
||||||
@@ -64,8 +61,6 @@ pub struct PoolIdentifier {
|
|||||||
pub user: String,
|
pub user: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
static POOL_REAPER_RATE: u64 = 30_000; // 30 seconds by default
|
|
||||||
|
|
||||||
impl PoolIdentifier {
|
impl PoolIdentifier {
|
||||||
/// Create a new user/pool identifier.
|
/// Create a new user/pool identifier.
|
||||||
pub fn new(db: &str, user: &str) -> PoolIdentifier {
|
pub fn new(db: &str, user: &str) -> PoolIdentifier {
|
||||||
@@ -96,7 +91,6 @@ pub struct PoolSettings {
|
|||||||
|
|
||||||
// Connecting user.
|
// Connecting user.
|
||||||
pub user: User,
|
pub user: User,
|
||||||
pub db: String,
|
|
||||||
|
|
||||||
// Default server role to connect to.
|
// Default server role to connect to.
|
||||||
pub default_role: Option<Role>,
|
pub default_role: Option<Role>,
|
||||||
@@ -135,9 +129,6 @@ pub struct PoolSettings {
|
|||||||
pub auth_query: Option<String>,
|
pub auth_query: Option<String>,
|
||||||
pub auth_query_user: Option<String>,
|
pub auth_query_user: Option<String>,
|
||||||
pub auth_query_password: Option<String>,
|
pub auth_query_password: Option<String>,
|
||||||
|
|
||||||
/// Plugins
|
|
||||||
pub plugins: Option<Plugins>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PoolSettings {
|
impl Default for PoolSettings {
|
||||||
@@ -147,7 +138,6 @@ impl Default for PoolSettings {
|
|||||||
load_balancing_mode: LoadBalancingMode::Random,
|
load_balancing_mode: LoadBalancingMode::Random,
|
||||||
shards: 1,
|
shards: 1,
|
||||||
user: User::default(),
|
user: User::default(),
|
||||||
db: String::default(),
|
|
||||||
default_role: None,
|
default_role: None,
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
primary_reads_enabled: true,
|
primary_reads_enabled: true,
|
||||||
@@ -162,7 +152,6 @@ impl Default for PoolSettings {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
plugins: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -202,7 +191,6 @@ pub struct ConnectionPool {
|
|||||||
paused: Arc<AtomicBool>,
|
paused: Arc<AtomicBool>,
|
||||||
paused_waiter: Arc<Notify>,
|
paused_waiter: Arc<Notify>,
|
||||||
|
|
||||||
/// Statistics.
|
|
||||||
pub stats: Arc<PoolStats>,
|
pub stats: Arc<PoolStats>,
|
||||||
|
|
||||||
/// AuthInfo
|
/// AuthInfo
|
||||||
@@ -360,10 +348,6 @@ impl ConnectionPool {
|
|||||||
client_server_map.clone(),
|
client_server_map.clone(),
|
||||||
pool_stats.clone(),
|
pool_stats.clone(),
|
||||||
pool_auth_hash.clone(),
|
pool_auth_hash.clone(),
|
||||||
match pool_config.plugins {
|
|
||||||
Some(ref plugins) => Some(plugins.clone()),
|
|
||||||
None => config.plugins.clone(),
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let connect_timeout = match pool_config.connect_timeout {
|
let connect_timeout = match pool_config.connect_timeout {
|
||||||
@@ -384,30 +368,15 @@ impl ConnectionPool {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let reaper_rate = *vec![idle_timeout, server_lifetime, POOL_REAPER_RATE]
|
|
||||||
.iter()
|
|
||||||
.min()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"[pool: {}][user: {}] Pool reaper rate: {}ms",
|
|
||||||
pool_name, user.username, reaper_rate
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = Pool::builder()
|
let pool = Pool::builder()
|
||||||
.max_size(user.pool_size)
|
.max_size(user.pool_size)
|
||||||
.min_idle(user.min_pool_size)
|
.min_idle(user.min_pool_size)
|
||||||
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
|
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
|
||||||
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
|
||||||
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
|
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
|
||||||
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
|
.test_on_check_out(false)
|
||||||
.test_on_check_out(false);
|
.build(manager)
|
||||||
|
.await?;
|
||||||
let pool = if config.general.validate_config {
|
|
||||||
pool.build(manager).await?
|
|
||||||
} else {
|
|
||||||
pool.build_unchecked(manager)
|
|
||||||
};
|
|
||||||
|
|
||||||
pools.push(pool);
|
pools.push(pool);
|
||||||
servers.push(address);
|
servers.push(address);
|
||||||
@@ -443,7 +412,6 @@ impl ConnectionPool {
|
|||||||
// shards: pool_config.shards.clone(),
|
// shards: pool_config.shards.clone(),
|
||||||
shards: shard_ids.len(),
|
shards: shard_ids.len(),
|
||||||
user: user.clone(),
|
user: user.clone(),
|
||||||
db: pool_name.clone(),
|
|
||||||
default_role: match pool_config.default_role.as_str() {
|
default_role: match pool_config.default_role.as_str() {
|
||||||
"any" => None,
|
"any" => None,
|
||||||
"replica" => Some(Role::Replica),
|
"replica" => Some(Role::Replica),
|
||||||
@@ -469,10 +437,6 @@ impl ConnectionPool {
|
|||||||
auth_query: pool_config.auth_query.clone(),
|
auth_query: pool_config.auth_query.clone(),
|
||||||
auth_query_user: pool_config.auth_query_user.clone(),
|
auth_query_user: pool_config.auth_query_user.clone(),
|
||||||
auth_query_password: pool_config.auth_query_password.clone(),
|
auth_query_password: pool_config.auth_query_password.clone(),
|
||||||
plugins: match pool_config.plugins {
|
|
||||||
Some(ref plugins) => Some(plugins.clone()),
|
|
||||||
None => config.plugins.clone(),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
validated: Arc::new(AtomicBool::new(false)),
|
validated: Arc::new(AtomicBool::new(false)),
|
||||||
paused: Arc::new(AtomicBool::new(false)),
|
paused: Arc::new(AtomicBool::new(false)),
|
||||||
@@ -482,12 +446,10 @@ impl ConnectionPool {
|
|||||||
// Connect to the servers to make sure pool configuration is valid
|
// Connect to the servers to make sure pool configuration is valid
|
||||||
// before setting it globally.
|
// before setting it globally.
|
||||||
// Do this async and somewhere else, we don't have to wait here.
|
// Do this async and somewhere else, we don't have to wait here.
|
||||||
if config.general.validate_config {
|
let mut validate_pool = pool.clone();
|
||||||
let mut validate_pool = pool.clone();
|
tokio::task::spawn(async move {
|
||||||
tokio::task::spawn(async move {
|
let _ = validate_pool.validate().await;
|
||||||
let _ = validate_pool.validate().await;
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// There is one pool per database/user pair.
|
// There is one pool per database/user pair.
|
||||||
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
new_pools.insert(PoolIdentifier::new(pool_name, &user.username), pool);
|
||||||
@@ -638,10 +600,7 @@ impl ConnectionPool {
|
|||||||
{
|
{
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Banning instance {:?}, error: {:?}", address, err);
|
||||||
"Connection checkout error for instance {:?}, error: {:?}",
|
|
||||||
address, err
|
|
||||||
);
|
|
||||||
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
self.ban(address, BanReason::FailedCheckout, Some(client_stats));
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
client_stats.idle();
|
client_stats.idle();
|
||||||
@@ -717,7 +676,7 @@ impl ConnectionPool {
|
|||||||
// Health check failed.
|
// Health check failed.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Failed health check on instance {:?}, error: {:?}",
|
"Banning instance {:?} because of failed health check, {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -726,7 +685,7 @@ impl ConnectionPool {
|
|||||||
// Health check timed out.
|
// Health check timed out.
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
"Health check timeout on instance {:?}, error: {:?}",
|
"Banning instance {:?} because of health check timeout, {:?}",
|
||||||
address, err
|
address, err
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -748,16 +707,13 @@ impl ConnectionPool {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
error!("Banning instance {:?}, reason: {:?}", address, reason);
|
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let mut guard = self.banlist.write();
|
let mut guard = self.banlist.write();
|
||||||
|
error!("Banning {:?}", address);
|
||||||
if let Some(client_info) = client_info {
|
if let Some(client_info) = client_info {
|
||||||
client_info.ban_error();
|
client_info.ban_error();
|
||||||
address.stats.error();
|
address.stats.error();
|
||||||
}
|
}
|
||||||
|
|
||||||
guard[address.shard].insert(address.clone(), (reason, now));
|
guard[address.shard].insert(address.clone(), (reason, now));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -920,7 +876,6 @@ pub struct ServerPool {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
plugins: Option<Plugins>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerPool {
|
impl ServerPool {
|
||||||
@@ -931,7 +886,6 @@ impl ServerPool {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
stats: Arc<PoolStats>,
|
stats: Arc<PoolStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
plugins: Option<Plugins>,
|
|
||||||
) -> ServerPool {
|
) -> ServerPool {
|
||||||
ServerPool {
|
ServerPool {
|
||||||
address,
|
address,
|
||||||
@@ -940,7 +894,6 @@ impl ServerPool {
|
|||||||
client_server_map,
|
client_server_map,
|
||||||
stats,
|
stats,
|
||||||
auth_hash,
|
auth_hash,
|
||||||
plugins,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -973,19 +926,7 @@ impl ManageConnection for ServerPool {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(mut conn) => {
|
Ok(conn) => {
|
||||||
if let Some(ref plugins) = self.plugins {
|
|
||||||
if let Some(ref prewarmer) = plugins.prewarmer {
|
|
||||||
let mut prewarmer = prewarmer::Prewarmer {
|
|
||||||
enabled: prewarmer.enabled,
|
|
||||||
server: &mut conn,
|
|
||||||
queries: &prewarmer.queries,
|
|
||||||
};
|
|
||||||
|
|
||||||
prewarmer.run().await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.idle();
|
stats.idle();
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,16 +6,13 @@ use once_cell::sync::OnceCell;
|
|||||||
use regex::{Regex, RegexSet};
|
use regex::{Regex, RegexSet};
|
||||||
use sqlparser::ast::Statement::{Query, StartTransaction};
|
use sqlparser::ast::Statement::{Query, StartTransaction};
|
||||||
use sqlparser::ast::{
|
use sqlparser::ast::{
|
||||||
BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, SetExpr, Statement, TableFactor,
|
BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, SetExpr, TableFactor, Value,
|
||||||
Value,
|
|
||||||
};
|
};
|
||||||
use sqlparser::dialect::PostgreSqlDialect;
|
use sqlparser::dialect::PostgreSqlDialect;
|
||||||
use sqlparser::parser::Parser;
|
use sqlparser::parser::Parser;
|
||||||
|
|
||||||
use crate::config::Role;
|
use crate::config::Role;
|
||||||
use crate::errors::Error;
|
|
||||||
use crate::messages::BytesMutReader;
|
use crate::messages::BytesMutReader;
|
||||||
use crate::plugins::{Intercept, Plugin, PluginOutput, QueryLogger, TableAccess};
|
|
||||||
use crate::pool::PoolSettings;
|
use crate::pool::PoolSettings;
|
||||||
use crate::sharding::Sharder;
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
@@ -132,10 +129,6 @@ impl QueryRouter {
|
|||||||
self.pool_settings = pool_settings;
|
self.pool_settings = pool_settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pool_settings<'a>(&'a self) -> &'a PoolSettings {
|
|
||||||
&self.pool_settings
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to parse a command and execute it.
|
/// Try to parse a command and execute it.
|
||||||
pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
|
pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
|
||||||
let mut message_cursor = Cursor::new(message_buffer);
|
let mut message_cursor = Cursor::new(message_buffer);
|
||||||
@@ -331,7 +324,10 @@ impl QueryRouter {
|
|||||||
Some((command, value))
|
Some((command, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(message: &BytesMut) -> Result<Vec<sqlparser::ast::Statement>, Error> {
|
/// Try to infer which server to connect to based on the contents of the query.
|
||||||
|
pub fn infer(&mut self, message: &BytesMut) -> bool {
|
||||||
|
debug!("Inferring role");
|
||||||
|
|
||||||
let mut message_cursor = Cursor::new(message);
|
let mut message_cursor = Cursor::new(message);
|
||||||
|
|
||||||
let code = message_cursor.get_u8() as char;
|
let code = message_cursor.get_u8() as char;
|
||||||
@@ -357,29 +353,28 @@ impl QueryRouter {
|
|||||||
query
|
query
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => return Err(Error::UnsupportedStatement),
|
_ => return false,
|
||||||
};
|
};
|
||||||
|
|
||||||
match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
||||||
Ok(ast) => Ok(ast),
|
Ok(ast) => ast,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
// SELECT ... FOR UPDATE won't get parsed correctly.
|
||||||
debug!("{}: {}", err, query);
|
debug!("{}: {}", err, query);
|
||||||
Err(Error::QueryRouterParserError(err.to_string()))
|
self.active_role = Some(Role::Primary);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to infer which server to connect to based on the contents of the query.
|
debug!("AST: {:?}", ast);
|
||||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
|
||||||
debug!("Inferring role");
|
|
||||||
|
|
||||||
if ast.is_empty() {
|
if ast.is_empty() {
|
||||||
// That's weird, no idea, let's go to primary
|
// That's weird, no idea, let's go to primary
|
||||||
self.active_role = Some(Role::Primary);
|
self.active_role = Some(Role::Primary);
|
||||||
return Err(Error::QueryRouterParserError("empty query".into()));
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for q in ast {
|
for q in &ast {
|
||||||
match q {
|
match q {
|
||||||
// All transactions go to the primary, probably a write.
|
// All transactions go to the primary, probably a write.
|
||||||
StartTransaction { .. } => {
|
StartTransaction { .. } => {
|
||||||
@@ -423,7 +418,7 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the shard number from the Bind message
|
/// Parse the shard number from the Bind message
|
||||||
@@ -788,52 +783,6 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add your plugins here and execute them.
|
|
||||||
pub async fn execute_plugins(&self, ast: &Vec<Statement>) -> Result<PluginOutput, Error> {
|
|
||||||
let plugins = match self.pool_settings.plugins {
|
|
||||||
Some(ref plugins) => plugins,
|
|
||||||
None => return Ok(PluginOutput::Allow),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(ref query_logger) = plugins.query_logger {
|
|
||||||
let mut query_logger = QueryLogger {
|
|
||||||
enabled: query_logger.enabled,
|
|
||||||
user: &self.pool_settings.user.username,
|
|
||||||
db: &self.pool_settings.db,
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = query_logger.run(&self, ast).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ref intercept) = plugins.intercept {
|
|
||||||
let mut intercept = Intercept {
|
|
||||||
enabled: intercept.enabled,
|
|
||||||
config: &intercept,
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = intercept.run(&self, ast).await;
|
|
||||||
|
|
||||||
if let Ok(PluginOutput::Intercept(output)) = result {
|
|
||||||
return Ok(PluginOutput::Intercept(output));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ref table_access) = plugins.table_access {
|
|
||||||
let mut table_access = TableAccess {
|
|
||||||
enabled: table_access.enabled,
|
|
||||||
tables: &table_access.tables,
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = table_access.run(&self, ast).await;
|
|
||||||
|
|
||||||
if let Ok(PluginOutput::Deny(error)) = result {
|
|
||||||
return Ok(PluginOutput::Deny(error));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(PluginOutput::Allow)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_sharding_key(&mut self, sharding_key: i64) -> Option<usize> {
|
fn set_sharding_key(&mut self, sharding_key: i64) -> Option<usize> {
|
||||||
let sharder = Sharder::new(
|
let sharder = Sharder::new(
|
||||||
self.pool_settings.shards,
|
self.pool_settings.shards,
|
||||||
@@ -861,23 +810,12 @@ impl QueryRouter {
|
|||||||
/// Should we attempt to parse queries?
|
/// Should we attempt to parse queries?
|
||||||
pub fn query_parser_enabled(&self) -> bool {
|
pub fn query_parser_enabled(&self) -> bool {
|
||||||
let enabled = match self.query_parser_enabled {
|
let enabled = match self.query_parser_enabled {
|
||||||
None => {
|
None => self.pool_settings.query_parser_enabled,
|
||||||
debug!(
|
Some(value) => value,
|
||||||
"Using pool settings, query_parser_enabled: {}",
|
|
||||||
self.pool_settings.query_parser_enabled
|
|
||||||
);
|
|
||||||
self.pool_settings.query_parser_enabled
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(value) => {
|
|
||||||
debug!(
|
|
||||||
"Using query parser override, query_parser_enabled: {}",
|
|
||||||
value
|
|
||||||
);
|
|
||||||
value
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
debug!("Query parser enabled: {}", enabled);
|
||||||
|
|
||||||
enabled
|
enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -924,7 +862,7 @@ mod test {
|
|||||||
|
|
||||||
for query in queries {
|
for query in queries {
|
||||||
// It's a recognized query
|
// It's a recognized query
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -943,7 +881,7 @@ mod test {
|
|||||||
|
|
||||||
for query in queries {
|
for query in queries {
|
||||||
// It's a recognized query
|
// It's a recognized query
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
assert_eq!(qr.role(), Some(Role::Primary));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -955,7 +893,7 @@ mod test {
|
|||||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||||
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO on")) != None);
|
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO on")) != None);
|
||||||
|
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -975,7 +913,7 @@ mod test {
|
|||||||
res.put(prepared_stmt);
|
res.put(prepared_stmt);
|
||||||
res.put_i16(0);
|
res.put_i16(0);
|
||||||
|
|
||||||
assert!(qr.infer(&QueryRouter::parse(&res).unwrap()).is_ok());
|
assert!(qr.infer(&res));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1139,11 +1077,11 @@ mod test {
|
|||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
|
|
||||||
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
assert_eq!(qr.role(), Some(Role::Primary));
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM test_table");
|
let query = simple_query("SELECT * FROM test_table");
|
||||||
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
|
assert!(qr.infer(&query));
|
||||||
assert_eq!(qr.role(), Some(Role::Replica));
|
assert_eq!(qr.role(), Some(Role::Replica));
|
||||||
|
|
||||||
assert!(qr.query_parser_enabled());
|
assert!(qr.query_parser_enabled());
|
||||||
@@ -1175,8 +1113,6 @@ mod test {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
db: "test".to_string(),
|
|
||||||
plugins: None,
|
|
||||||
};
|
};
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
assert_eq!(qr.active_role, None);
|
assert_eq!(qr.active_role, None);
|
||||||
@@ -1206,24 +1142,15 @@ mod test {
|
|||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("BEGIN; SELECT 1; COMMIT;")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("BEGIN; SELECT 1; COMMIT;")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Primary);
|
assert_eq!(qr.role(), Role::Primary);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT 1; SELECT 2;")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("SELECT 1; SELECT 2;")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Replica);
|
assert_eq!(qr.role(), Role::Replica);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.role(), Role::Primary);
|
assert_eq!(qr.role(), Role::Primary);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1250,10 +1177,7 @@ mod test {
|
|||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_password: None,
|
auth_query_password: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
db: "test".to_string(),
|
|
||||||
plugins: None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
let mut qr = QueryRouter::new();
|
||||||
qr.update_pool_settings(pool_settings.clone());
|
qr.update_pool_settings(pool_settings.clone());
|
||||||
|
|
||||||
@@ -1284,84 +1208,47 @@ mod test {
|
|||||||
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
||||||
qr.pool_settings.shards = 3;
|
qr.pool_settings.shards = 3;
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT * FROM data WHERE id = 5")));
|
||||||
.infer(&QueryRouter::parse(&simple_query("SELECT * FROM data WHERE id = 5")).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT one, two, three FROM public.data WHERE id = 6"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT one, two, three FROM public.data WHERE id = 6"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM data
|
||||||
&QueryRouter::parse(&simple_query(
|
|
||||||
"SELECT * FROM data
|
|
||||||
INNER JOIN t2 ON data.id = 5
|
INNER JOIN t2 ON data.id = 5
|
||||||
AND t2.data_id = data.id
|
AND t2.data_id = data.id
|
||||||
WHERE data.id = 5"
|
WHERE data.id = 5"
|
||||||
))
|
)));
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
// Shard did not move because we couldn't determine the sharding key since it could be ambiguous
|
// Shard did not move because we couldn't determine the sharding key since it could be ambiguous
|
||||||
// in the query.
|
// in the query.
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM t2 INNER JOIN data ON id = 6 AND data.id = t2.data_id"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT * FROM t2 INNER JOIN data ON id = 6 AND data.id = t2.data_id"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
r#"SELECT * FROM "public"."data" WHERE "id" = 6"#
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
r#"SELECT * FROM "public"."data" WHERE "id" = 6"#
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
r#"SELECT * FROM "public"."data" WHERE "data"."id" = 5"#
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
r#"SELECT * FROM "public"."data" WHERE "data"."id" = 5"#
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
|
|
||||||
// Super unique sharding key
|
// Super unique sharding key
|
||||||
qr.pool_settings.automatic_sharding_key = Some("*.unique_enough_column_name".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("*.unique_enough_column_name".to_string());
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(
|
||||||
.infer(
|
"SELECT * FROM table_x WHERE unique_enough_column_name = 6"
|
||||||
&QueryRouter::parse(&simple_query(
|
)));
|
||||||
"SELECT * FROM table_x WHERE unique_enough_column_name = 6"
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query("SELECT * FROM table_y WHERE another_key = 5")));
|
||||||
.infer(
|
|
||||||
&QueryRouter::parse(&simple_query("SELECT * FROM table_y WHERE another_key = 5"))
|
|
||||||
.unwrap()
|
|
||||||
)
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.shard(), 0);
|
assert_eq!(qr.shard(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1385,61 +1272,11 @@ mod test {
|
|||||||
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
|
||||||
qr.pool_settings.shards = 3;
|
qr.pool_settings.shards = 3;
|
||||||
|
|
||||||
assert!(qr
|
assert!(qr.infer(&simple_query(stmt)));
|
||||||
.infer(&QueryRouter::parse(&simple_query(stmt)).unwrap())
|
|
||||||
.is_ok());
|
|
||||||
assert_eq!(qr.placeholders.len(), 1);
|
assert_eq!(qr.placeholders.len(), 1);
|
||||||
|
|
||||||
assert!(qr.infer_shard_from_bind(&bind));
|
assert!(qr.infer_shard_from_bind(&bind));
|
||||||
assert_eq!(qr.shard(), 2);
|
assert_eq!(qr.shard(), 2);
|
||||||
assert!(qr.placeholders.is_empty());
|
assert!(qr.placeholders.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_table_access_plugin() {
|
|
||||||
use crate::config::{Plugins, TableAccess};
|
|
||||||
let table_access = TableAccess {
|
|
||||||
enabled: true,
|
|
||||||
tables: vec![String::from("pg_database")],
|
|
||||||
};
|
|
||||||
let plugins = Plugins {
|
|
||||||
table_access: Some(table_access),
|
|
||||||
intercept: None,
|
|
||||||
query_logger: None,
|
|
||||||
prewarmer: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut pool_settings = PoolSettings::default();
|
|
||||||
pool_settings.query_parser_enabled = true;
|
|
||||||
pool_settings.plugins = Some(plugins);
|
|
||||||
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.update_pool_settings(pool_settings);
|
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM pg_database");
|
|
||||||
let ast = QueryRouter::parse(&query).unwrap();
|
|
||||||
|
|
||||||
let res = qr.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
res,
|
|
||||||
Ok(PluginOutput::Deny(
|
|
||||||
"permission for table \"pg_database\" denied".to_string()
|
|
||||||
))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_plugins_disabled_by_defaault() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let qr = QueryRouter::new();
|
|
||||||
|
|
||||||
let query = simple_query("SELECT * FROM pg_database");
|
|
||||||
let ast = QueryRouter::parse(&query).unwrap();
|
|
||||||
|
|
||||||
let res = qr.execute_plugins(&ast).await;
|
|
||||||
|
|
||||||
assert_eq!(res, Ok(PluginOutput::Allow));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
131
src/server.rs
131
src/server.rs
@@ -7,7 +7,6 @@ use parking_lot::{Mutex, RwLock};
|
|||||||
use postgres_protocol::message;
|
use postgres_protocol::message;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
|
||||||
@@ -17,7 +16,6 @@ use tokio_rustls::{client::TlsStream, TlsConnector};
|
|||||||
|
|
||||||
use crate::config::{get_config, Address, User};
|
use crate::config::{get_config, Address, User};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
|
|
||||||
use crate::errors::{Error, ServerIdentifier};
|
use crate::errors::{Error, ServerIdentifier};
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::mirrors::MirroringManager;
|
use crate::mirrors::MirroringManager;
|
||||||
@@ -103,48 +101,6 @@ impl StreamInner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
|
||||||
struct CleanupState {
|
|
||||||
/// If server connection requires DISCARD ALL before checkin because of set statement
|
|
||||||
needs_cleanup_set: bool,
|
|
||||||
|
|
||||||
/// If server connection requires DISCARD ALL before checkin because of prepare statement
|
|
||||||
needs_cleanup_prepare: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CleanupState {
|
|
||||||
fn new() -> Self {
|
|
||||||
CleanupState {
|
|
||||||
needs_cleanup_set: false,
|
|
||||||
needs_cleanup_prepare: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn needs_cleanup(&self) -> bool {
|
|
||||||
self.needs_cleanup_set || self.needs_cleanup_prepare
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_true(&mut self) {
|
|
||||||
self.needs_cleanup_set = true;
|
|
||||||
self.needs_cleanup_prepare = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset(&mut self) {
|
|
||||||
self.needs_cleanup_set = false;
|
|
||||||
self.needs_cleanup_prepare = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for CleanupState {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"SET: {}, PREPARE: {}",
|
|
||||||
self.needs_cleanup_set, self.needs_cleanup_prepare
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Server state.
|
/// Server state.
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
/// Server host, e.g. localhost,
|
/// Server host, e.g. localhost,
|
||||||
@@ -173,8 +129,8 @@ pub struct Server {
|
|||||||
/// Is the server broken? We'll remote it from the pool if so.
|
/// Is the server broken? We'll remote it from the pool if so.
|
||||||
bad: bool,
|
bad: bool,
|
||||||
|
|
||||||
/// If server connection requires DISCARD ALL before checkin
|
/// If server connection requires a DISCARD ALL before checkin
|
||||||
cleanup_state: CleanupState,
|
needs_cleanup: bool,
|
||||||
|
|
||||||
/// Mapping of clients and servers used for query cancellation.
|
/// Mapping of clients and servers used for query cancellation.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
@@ -192,9 +148,6 @@ pub struct Server {
|
|||||||
last_activity: SystemTime,
|
last_activity: SystemTime,
|
||||||
|
|
||||||
mirror_manager: Option<MirroringManager>,
|
mirror_manager: Option<MirroringManager>,
|
||||||
|
|
||||||
// Associated addresses used
|
|
||||||
addr_set: Option<AddrSet>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
@@ -208,24 +161,6 @@ impl Server {
|
|||||||
stats: Arc<ServerStats>,
|
stats: Arc<ServerStats>,
|
||||||
auth_hash: Arc<RwLock<Option<String>>>,
|
auth_hash: Arc<RwLock<Option<String>>>,
|
||||||
) -> Result<Server, Error> {
|
) -> Result<Server, Error> {
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
let mut addr_set: Option<AddrSet> = None;
|
|
||||||
|
|
||||||
// If we are caching addresses and hostname is not an IP
|
|
||||||
if cached_resolver.enabled() && address.host.parse::<IpAddr>().is_err() {
|
|
||||||
debug!("Resolving {}", &address.host);
|
|
||||||
addr_set = match cached_resolver.lookup_ip(&address.host).await {
|
|
||||||
Ok(ok) => {
|
|
||||||
debug!("Obtained: {:?}", ok);
|
|
||||||
Some(ok)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Error trying to resolve {}, ({:?})", &address.host, err);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stream =
|
let mut stream =
|
||||||
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
|
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
|
||||||
Ok(stream) => stream,
|
Ok(stream) => stream,
|
||||||
@@ -672,9 +607,8 @@ impl Server {
|
|||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
cleanup_state: CleanupState::new(),
|
needs_cleanup: false,
|
||||||
client_server_map,
|
client_server_map,
|
||||||
addr_set,
|
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
stats,
|
stats,
|
||||||
application_name: String::new(),
|
application_name: String::new(),
|
||||||
@@ -747,10 +681,7 @@ impl Server {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Terminating server because of: {:?}", err);
|
||||||
"Terminating server {:?} because of: {:?}",
|
|
||||||
self.address, err
|
|
||||||
);
|
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
@@ -765,10 +696,7 @@ impl Server {
|
|||||||
let mut message = match read_message(&mut self.stream).await {
|
let mut message = match read_message(&mut self.stream).await {
|
||||||
Ok(message) => message,
|
Ok(message) => message,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!("Terminating server because of: {:?}", err);
|
||||||
"Terminating server {:?} because of: {:?}",
|
|
||||||
self.address, err
|
|
||||||
);
|
|
||||||
self.bad = true;
|
self.bad = true;
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
@@ -835,12 +763,12 @@ impl Server {
|
|||||||
// This will reduce amount of discard statements sent
|
// This will reduce amount of discard statements sent
|
||||||
if !self.in_transaction {
|
if !self.in_transaction {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.cleanup_state.needs_cleanup_set = true;
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"PREPARE\0" => {
|
"PREPARE\0" => {
|
||||||
debug!("Server connection marked for clean up");
|
debug!("Server connection marked for clean up");
|
||||||
self.cleanup_state.needs_cleanup_prepare = true;
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -921,23 +849,7 @@ impl Server {
|
|||||||
/// Server & client are out of sync, we must discard this connection.
|
/// Server & client are out of sync, we must discard this connection.
|
||||||
/// This happens with clients that misbehave.
|
/// This happens with clients that misbehave.
|
||||||
pub fn is_bad(&self) -> bool {
|
pub fn is_bad(&self) -> bool {
|
||||||
if self.bad {
|
self.bad
|
||||||
return self.bad;
|
|
||||||
};
|
|
||||||
let cached_resolver = CACHED_RESOLVER.load();
|
|
||||||
if cached_resolver.enabled() {
|
|
||||||
if let Some(addr_set) = &self.addr_set {
|
|
||||||
if cached_resolver.has_changed(self.address.host.as_str(), addr_set) {
|
|
||||||
warn!(
|
|
||||||
"DNS changed for {}, it was {:?}. Dropping server connection.",
|
|
||||||
self.address.host.as_str(),
|
|
||||||
addr_set
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get server startup information to forward it to the client.
|
/// Get server startup information to forward it to the client.
|
||||||
@@ -970,8 +882,6 @@ impl Server {
|
|||||||
/// It will use the simple query protocol.
|
/// It will use the simple query protocol.
|
||||||
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
||||||
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
|
||||||
debug!("Running `{}` on server {:?}", query, self.address);
|
|
||||||
|
|
||||||
let query = simple_query(query);
|
let query = simple_query(query);
|
||||||
|
|
||||||
self.send(&query).await?;
|
self.send(&query).await?;
|
||||||
@@ -1004,11 +914,10 @@ impl Server {
|
|||||||
// to avoid leaking state between clients. For performance reasons we only
|
// to avoid leaking state between clients. For performance reasons we only
|
||||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||||
// it before each checkin.
|
// it before each checkin.
|
||||||
if self.cleanup_state.needs_cleanup() {
|
if self.needs_cleanup {
|
||||||
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
|
warn!("Server returned with session state altered, discarding state");
|
||||||
self.query("DISCARD ALL").await?;
|
self.query("DISCARD ALL").await?;
|
||||||
self.query("RESET ROLE").await?;
|
self.needs_cleanup = false;
|
||||||
self.cleanup_state.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1020,12 +929,12 @@ impl Server {
|
|||||||
self.application_name = name.to_string();
|
self.application_name = name.to_string();
|
||||||
// We don't want `SET application_name` to mark the server connection
|
// We don't want `SET application_name` to mark the server connection
|
||||||
// as needing cleanup
|
// as needing cleanup
|
||||||
let needs_cleanup_before = self.cleanup_state;
|
let needs_cleanup_before = self.needs_cleanup;
|
||||||
|
|
||||||
let result = Ok(self
|
let result = Ok(self
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
.await?);
|
.await?);
|
||||||
self.cleanup_state = needs_cleanup_before;
|
self.needs_cleanup = needs_cleanup_before;
|
||||||
result
|
result
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1050,7 +959,7 @@ impl Server {
|
|||||||
|
|
||||||
// Marks a connection as needing DISCARD ALL at checkin
|
// Marks a connection as needing DISCARD ALL at checkin
|
||||||
pub fn mark_dirty(&mut self) {
|
pub fn mark_dirty(&mut self) {
|
||||||
self.cleanup_state.set_true();
|
self.needs_cleanup = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
pub fn mirror_send(&mut self, bytes: &BytesMut) {
|
||||||
@@ -1186,18 +1095,14 @@ impl Drop for Server {
|
|||||||
_ => debug!("Dirty shutdown"),
|
_ => debug!("Dirty shutdown"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Should not matter.
|
||||||
|
self.bad = true;
|
||||||
|
|
||||||
let now = chrono::offset::Utc::now().naive_utc();
|
let now = chrono::offset::Utc::now().naive_utc();
|
||||||
let duration = now - self.connected_at;
|
let duration = now - self.connected_at;
|
||||||
|
|
||||||
let message = if self.bad {
|
|
||||||
"Server connection terminated"
|
|
||||||
} else {
|
|
||||||
"Server connection closed"
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"{} {:?}, session duration: {}",
|
"Server connection closed {:?}, session duration: {}",
|
||||||
message,
|
|
||||||
self.address,
|
self.address,
|
||||||
crate::format_duration(&duration)
|
crate::format_duration(&duration)
|
||||||
);
|
);
|
||||||
|
|||||||
15
src/stats.rs
15
src/stats.rs
@@ -107,19 +107,8 @@ impl Collector {
|
|||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
// Hold read lock for duration of update to retain all server stats
|
for stats in SERVER_STATS.read().values() {
|
||||||
let server_stats = SERVER_STATS.read();
|
stats.address_stats().update_averages();
|
||||||
|
|
||||||
for stats in server_stats.values() {
|
|
||||||
if !stats.check_address_stat_average_is_updated_status() {
|
|
||||||
stats.address_stats().update_averages();
|
|
||||||
stats.set_address_stat_average_is_updated_status(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset to false for next update
|
|
||||||
for stats in server_stats.values() {
|
|
||||||
stats.set_address_stat_average_is_updated_status(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use log::warn;
|
||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -12,16 +13,6 @@ pub struct AddressStats {
|
|||||||
pub total_query_time: Arc<AtomicU64>,
|
pub total_query_time: Arc<AtomicU64>,
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
pub total_wait_time: Arc<AtomicU64>,
|
||||||
pub total_errors: Arc<AtomicU64>,
|
pub total_errors: Arc<AtomicU64>,
|
||||||
|
|
||||||
pub old_total_xact_count: Arc<AtomicU64>,
|
|
||||||
pub old_total_query_count: Arc<AtomicU64>,
|
|
||||||
pub old_total_received: Arc<AtomicU64>,
|
|
||||||
pub old_total_sent: Arc<AtomicU64>,
|
|
||||||
pub old_total_xact_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_query_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_wait_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_errors: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
pub avg_query_count: Arc<AtomicU64>,
|
pub avg_query_count: Arc<AtomicU64>,
|
||||||
pub avg_query_time: Arc<AtomicU64>,
|
pub avg_query_time: Arc<AtomicU64>,
|
||||||
pub avg_recv: Arc<AtomicU64>,
|
pub avg_recv: Arc<AtomicU64>,
|
||||||
@@ -30,9 +21,6 @@ pub struct AddressStats {
|
|||||||
pub avg_xact_time: Arc<AtomicU64>,
|
pub avg_xact_time: Arc<AtomicU64>,
|
||||||
pub avg_xact_count: Arc<AtomicU64>,
|
pub avg_xact_count: Arc<AtomicU64>,
|
||||||
pub avg_wait_time: Arc<AtomicU64>,
|
pub avg_wait_time: Arc<AtomicU64>,
|
||||||
|
|
||||||
// Determines if the averages have been updated since the last time they were reported
|
|
||||||
pub averages_updated: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoIterator for AddressStats {
|
impl IntoIterator for AddressStats {
|
||||||
@@ -116,15 +104,16 @@ impl AddressStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_averages(&self) {
|
pub fn update_averages(&self) {
|
||||||
let (totals, averages, old_totals) = self.fields_iterators();
|
let (totals, averages) = self.fields_iterators();
|
||||||
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
|
for data in totals.iter().zip(averages.iter()) {
|
||||||
let total_value = total.load(Ordering::Relaxed);
|
let (total, average) = data;
|
||||||
let old_total_value = old_total.load(Ordering::Relaxed);
|
if let Err(err) = average.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |avg| {
|
||||||
average.store(
|
let total = total.load(Ordering::Relaxed);
|
||||||
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
|
let avg = (total - avg) / (crate::stats::STAT_PERIOD / 1_000); // Avg / second
|
||||||
Ordering::Relaxed,
|
Some(avg)
|
||||||
); // Avg / second
|
}) {
|
||||||
old_total.store(total_value, Ordering::Relaxed);
|
warn!("Could not update averages for addresses stats, {:?}", err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,42 +123,27 @@ impl AddressStats {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fields_iterators(
|
fn fields_iterators(&self) -> (Vec<Arc<AtomicU64>>, Vec<Arc<AtomicU64>>) {
|
||||||
&self,
|
|
||||||
) -> (
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
) {
|
|
||||||
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
||||||
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
||||||
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
|
|
||||||
totals.push(self.total_xact_count.clone());
|
totals.push(self.total_xact_count.clone());
|
||||||
old_totals.push(self.old_total_xact_count.clone());
|
|
||||||
averages.push(self.avg_xact_count.clone());
|
averages.push(self.avg_xact_count.clone());
|
||||||
totals.push(self.total_query_count.clone());
|
totals.push(self.total_query_count.clone());
|
||||||
old_totals.push(self.old_total_query_count.clone());
|
|
||||||
averages.push(self.avg_query_count.clone());
|
averages.push(self.avg_query_count.clone());
|
||||||
totals.push(self.total_received.clone());
|
totals.push(self.total_received.clone());
|
||||||
old_totals.push(self.old_total_received.clone());
|
|
||||||
averages.push(self.avg_recv.clone());
|
averages.push(self.avg_recv.clone());
|
||||||
totals.push(self.total_sent.clone());
|
totals.push(self.total_sent.clone());
|
||||||
old_totals.push(self.old_total_sent.clone());
|
|
||||||
averages.push(self.avg_sent.clone());
|
averages.push(self.avg_sent.clone());
|
||||||
totals.push(self.total_xact_time.clone());
|
totals.push(self.total_xact_time.clone());
|
||||||
old_totals.push(self.old_total_xact_time.clone());
|
|
||||||
averages.push(self.avg_xact_time.clone());
|
averages.push(self.avg_xact_time.clone());
|
||||||
totals.push(self.total_query_time.clone());
|
totals.push(self.total_query_time.clone());
|
||||||
old_totals.push(self.old_total_query_time.clone());
|
|
||||||
averages.push(self.avg_query_time.clone());
|
averages.push(self.avg_query_time.clone());
|
||||||
totals.push(self.total_wait_time.clone());
|
totals.push(self.total_wait_time.clone());
|
||||||
old_totals.push(self.old_total_wait_time.clone());
|
|
||||||
averages.push(self.avg_wait_time.clone());
|
averages.push(self.avg_wait_time.clone());
|
||||||
totals.push(self.total_errors.clone());
|
totals.push(self.total_errors.clone());
|
||||||
old_totals.push(self.old_total_errors.clone());
|
|
||||||
averages.push(self.avg_errors.clone());
|
averages.push(self.avg_errors.clone());
|
||||||
|
|
||||||
(totals, averages, old_totals)
|
(totals, averages)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,17 +139,6 @@ impl ServerStats {
|
|||||||
self.address.stats.clone()
|
self.address.stats.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_address_stat_average_is_updated_status(&self) -> bool {
|
|
||||||
self.address.stats.averages_updated.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_address_stat_average_is_updated_status(&self, is_checked: bool) {
|
|
||||||
self.address
|
|
||||||
.stats
|
|
||||||
.averages_updated
|
|
||||||
.store(is_checked, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper methods for show_servers
|
// Helper methods for show_servers
|
||||||
pub fn pool_name(&self) -> String {
|
pub fn pool_name(&self) -> String {
|
||||||
self.pool_stats.database()
|
self.pool_stats.database()
|
||||||
|
|||||||
@@ -14,12 +14,11 @@ describe "Admin" do
|
|||||||
describe "SHOW STATS" do
|
describe "SHOW STATS" do
|
||||||
context "clients connect and make one query" do
|
context "clients connect and make one query" do
|
||||||
it "updates *_query_time and *_wait_time" do
|
it "updates *_query_time and *_wait_time" do
|
||||||
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
|
||||||
connections.each do |c|
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
end
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
sleep(1)
|
connection.close
|
||||||
connections.map(&:close)
|
|
||||||
|
|
||||||
# wait for averages to be calculated, we shouldn't do this too often
|
# wait for averages to be calculated, we shouldn't do this too often
|
||||||
sleep(15.5)
|
sleep(15.5)
|
||||||
@@ -27,7 +26,7 @@ describe "Admin" do
|
|||||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
admin_conn.close
|
admin_conn.close
|
||||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
|
expect(results["avg_query_time"].to_i).to_not eq(0)
|
||||||
|
|
||||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
@@ -72,17 +71,15 @@ describe "Admin" do
|
|||||||
|
|
||||||
context "client connects but issues no queries" do
|
context "client connects but issues no queries" do
|
||||||
it "only affects cl_idle stats" do
|
it "only affects cl_idle stats" do
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
|
|
||||||
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
|
|
||||||
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["cl_idle"]).to eq("20")
|
expect(results["cl_idle"]).to eq("20")
|
||||||
expect(results["sv_idle"]).to eq(before_test)
|
expect(results["sv_idle"]).to eq("1")
|
||||||
|
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
sleep(1.1)
|
sleep(1.1)
|
||||||
@@ -90,7 +87,7 @@ describe "Admin" do
|
|||||||
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
expect(results["sv_idle"]).to eq(before_test)
|
expect(results["sv_idle"]).to eq("1")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -41,24 +41,7 @@ module Helpers
|
|||||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||||
},
|
},
|
||||||
"users" => { "0" => user },
|
"users" => { "0" => user }
|
||||||
"plugins" => {
|
|
||||||
"intercept" => {
|
|
||||||
"enabled" => true,
|
|
||||||
"queries" => {
|
|
||||||
"0" => {
|
|
||||||
"query" => "select current_database() as a, current_schemas(false) as b",
|
|
||||||
"schema" => [
|
|
||||||
["a", "text"],
|
|
||||||
["b", "text"],
|
|
||||||
],
|
|
||||||
"result" => [
|
|
||||||
["${DATABASE}", "{public}"],
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pgcat.update_config(pgcat_cfg)
|
pgcat.update_config(pgcat_cfg)
|
||||||
|
|||||||
@@ -241,18 +241,6 @@ describe "Miscellaneous" do
|
|||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "Resets server roles correctly" do
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.async_exec("SET statement_timeout to 5000")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("RESET ROLE")).to eq(10)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
context "transaction mode" do
|
context "transaction mode" do
|
||||||
|
|||||||
@@ -1,14 +0,0 @@
|
|||||||
require_relative 'spec_helper'
|
|
||||||
|
|
||||||
|
|
||||||
describe "Plugins" do
|
|
||||||
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
|
||||||
|
|
||||||
context "intercept" do
|
|
||||||
it "will intercept an intellij query" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
res = conn.exec("select current_database() as a, current_schemas(false) as b")
|
|
||||||
expect(res.values).to eq([["sharded_db", "{public}"]])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
Reference in New Issue
Block a user