mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
20 Commits
levkk-auth
...
levkk-asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd3623ff13 | ||
|
|
088f1a7dae | ||
|
|
ab7ac16974 | ||
|
|
62b2d994c1 | ||
|
|
66805d7e77 | ||
|
|
4ccc1e7fa3 | ||
|
|
3dae3d0777 | ||
|
|
a18eb42df5 | ||
|
|
6aacf1fa19 | ||
|
|
8e99e65215 | ||
|
|
5dfbc102a9 | ||
|
|
bae12fca99 | ||
|
|
421c5d4b64 | ||
|
|
d568739db9 | ||
|
|
692353c839 | ||
|
|
a62f6b0eea | ||
|
|
89e15f09b5 | ||
|
|
7ddd23b514 | ||
|
|
faa9c1f64a | ||
|
|
9094988491 |
@@ -39,7 +39,7 @@ log_client_connections = false
|
||||
log_client_disconnections = false
|
||||
|
||||
# Reload config automatically if it changes.
|
||||
autoreload = true
|
||||
autoreload = 15000
|
||||
|
||||
# TLS
|
||||
tls_certificate = ".circleci/server.cert"
|
||||
|
||||
@@ -110,6 +110,10 @@ python3 tests/python/tests.py || exit 1
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
python3 tests/python/async_test.py
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
# Admin tests
|
||||
export PGPASSWORD=admin_pass
|
||||
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||
|
||||
14
.editorconfig
Normal file
14
.editorconfig
Normal file
@@ -0,0 +1,14 @@
|
||||
root = true
|
||||
|
||||
[*]
|
||||
trim_trailing_whitespace = true
|
||||
insert_final_newline = true
|
||||
|
||||
[*.rs]
|
||||
indent_style = space
|
||||
indent_size = 4
|
||||
max_line_length = 120
|
||||
|
||||
[*.toml]
|
||||
indent_style = space
|
||||
indent_size = 2
|
||||
125
CONFIG.md
125
CONFIG.md
@@ -108,7 +108,7 @@ If we should log client disconnections
|
||||
### autoreload
|
||||
```
|
||||
path: general.autoreload
|
||||
default: false
|
||||
default: 15000
|
||||
```
|
||||
|
||||
When set to true, PgCat reloads configs if it detects a change in the config file.
|
||||
@@ -152,7 +152,7 @@ default: <UNSET>
|
||||
example: "server.cert"
|
||||
```
|
||||
|
||||
Path to TLS Certficate file to use for TLS connections
|
||||
Path to TLS Certificate file to use for TLS connections
|
||||
|
||||
### tls_private_key
|
||||
```
|
||||
@@ -175,41 +175,11 @@ Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DAT
|
||||
### admin_password
|
||||
```
|
||||
path: general.admin_password
|
||||
default: <UNSET>
|
||||
default: "admin_pass"
|
||||
```
|
||||
|
||||
Password to access the virtual administrative database
|
||||
|
||||
### auth_query (experimental)
|
||||
```
|
||||
path: general.auth_query
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
|
||||
established using the database configured in the pool. This parameter is inherited by every pool
|
||||
and can be redefined in pool configuration.
|
||||
|
||||
### auth_query_user (experimental)
|
||||
```
|
||||
path: general.auth_query_user
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
|
||||
### auth_query_password (experimental)
|
||||
```
|
||||
path: general.auth_query_password
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
|
||||
## `pools.<pool_name>` Section
|
||||
|
||||
### pool_mode
|
||||
@@ -243,7 +213,7 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
|
||||
`replica` round-robin between replicas only without touching the primary,
|
||||
`primary` all queries go to the primary unless otherwise specified.
|
||||
|
||||
### query_parser_enabled (experimental)
|
||||
### query_parser_enabled
|
||||
```
|
||||
path: pools.<pool_name>.query_parser_enabled
|
||||
default: true
|
||||
@@ -264,7 +234,7 @@ If the query parser is enabled and this setting is enabled, the primary will be
|
||||
load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
queries. The primary can always be explicitly selected with our custom protocol.
|
||||
|
||||
### sharding_key_regex (experimental)
|
||||
### sharding_key_regex
|
||||
```
|
||||
path: pools.<pool_name>.sharding_key_regex
|
||||
default: <UNSET>
|
||||
@@ -286,7 +256,40 @@ Current options:
|
||||
`pg_bigint_hash`: PARTITION BY HASH (Postgres hashing function)
|
||||
`sha1`: A hashing function based on SHA1
|
||||
|
||||
### automatic_sharding_key (experimental)
|
||||
### auth_query
|
||||
```
|
||||
path: pools.<pool_name>.auth_query
|
||||
default: <UNSET>
|
||||
example: "SELECT $1"
|
||||
```
|
||||
|
||||
Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
|
||||
established using the database configured in the pool. This parameter is inherited by every pool
|
||||
and can be redefined in pool configuration.
|
||||
|
||||
### auth_query_user
|
||||
```
|
||||
path: pools.<pool_name>.auth_query_user
|
||||
default: <UNSET>
|
||||
example: "sharding_user"
|
||||
```
|
||||
|
||||
User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
|
||||
### auth_query_password
|
||||
```
|
||||
path: pools.<pool_name>.auth_query_password
|
||||
default: <UNSET>
|
||||
example: "sharding_user"
|
||||
```
|
||||
|
||||
Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
|
||||
### automatic_sharding_key
|
||||
```
|
||||
path: pools.<pool_name>.automatic_sharding_key
|
||||
default: <UNSET>
|
||||
@@ -311,30 +314,6 @@ default: 3000
|
||||
|
||||
Connect timeout can be overwritten in the pool
|
||||
|
||||
### auth_query (experimental)
|
||||
```
|
||||
path: general.auth_query
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
Auth query can be overwritten in the pool
|
||||
|
||||
### auth_query_user (experimental)
|
||||
```
|
||||
path: general.auth_query_user
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
Auth query user can be overwritten in the pool
|
||||
|
||||
### auth_query_password (experimental)
|
||||
```
|
||||
path: general.auth_query_password
|
||||
default: <UNSET>
|
||||
```
|
||||
|
||||
Auth query password can be overwritten in the pool
|
||||
|
||||
## `pools.<pool_name>.users.<user_index>` Section
|
||||
|
||||
### username
|
||||
@@ -343,7 +322,8 @@ path: pools.<pool_name>.users.<user_index>.username
|
||||
default: "sharding_user"
|
||||
```
|
||||
|
||||
Postgresql username
|
||||
PostgreSQL username used to authenticate the user and connect to the server
|
||||
if `server_username` is not set.
|
||||
|
||||
### password
|
||||
```
|
||||
@@ -351,7 +331,26 @@ path: pools.<pool_name>.users.<user_index>.password
|
||||
default: "sharding_user"
|
||||
```
|
||||
|
||||
Postgresql password
|
||||
PostgreSQL password used to authenticate the user and connect to the server
|
||||
if `server_password` is not set.
|
||||
|
||||
### server_username
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.server_username
|
||||
default: <UNSET>
|
||||
example: "another_user"
|
||||
```
|
||||
|
||||
PostgreSQL username used to connect to the server.
|
||||
|
||||
### server_password
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.server_password
|
||||
default: <UNSET>
|
||||
example: "another_password"
|
||||
```
|
||||
|
||||
PostgreSQL password used to connect to the server.
|
||||
|
||||
### pool_size
|
||||
```
|
||||
@@ -382,7 +381,7 @@ default: [["127.0.0.1", 5432, "primary"], ["localhost", 5432, "replica"]]
|
||||
|
||||
Array of servers in the shard, each server entry is an array of `[host, port, role]`
|
||||
|
||||
### mirrors (experimental)
|
||||
### mirrors
|
||||
```
|
||||
path: pools.<pool_name>.shards.<shard_index>.mirrors
|
||||
default: <UNSET>
|
||||
|
||||
119
Cargo.lock
generated
119
Cargo.lock
generated
@@ -4,9 +4,9 @@ version = 3
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "0.7.20"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
|
||||
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
@@ -54,12 +54,6 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.0"
|
||||
@@ -283,9 +277,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549"
|
||||
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
@@ -298,9 +292,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac"
|
||||
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
@@ -308,15 +302,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-core"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd"
|
||||
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83"
|
||||
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
@@ -325,38 +319,38 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91"
|
||||
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6"
|
||||
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2"
|
||||
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
|
||||
|
||||
[[package]]
|
||||
name = "futures-task"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879"
|
||||
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
version = "0.3.27"
|
||||
version = "0.3.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab"
|
||||
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
@@ -393,9 +387,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.15"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4"
|
||||
checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -482,9 +476,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.25"
|
||||
version = "0.14.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cc5e554ff619822309ffd57d8734d77cd5ce6238bc956f037ea06c58238c9899"
|
||||
checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
@@ -745,12 +739,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "1.0.0"
|
||||
version = "1.0.1"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"atomic_enum",
|
||||
"base64 0.21.0",
|
||||
"base64",
|
||||
"bb8",
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -840,11 +834,11 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
|
||||
checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -921,9 +915,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.7.3"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
|
||||
checksum = "ac6cf59af1067a3fb53fbe5c88c053764e930f932be1d71d3ffe032cbe147f59"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -932,9 +926,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
checksum = "b6868896879ba532248f33598de5181522d8b3d9d724dfd230911e1a7d4822f5"
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
@@ -967,14 +961,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.8"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
|
||||
checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
"rustls-webpki",
|
||||
"sct",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -983,7 +977,17 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.21.0",
|
||||
"base64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.100.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1010,15 +1014,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.159"
|
||||
version = "1.0.160"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065"
|
||||
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.159"
|
||||
version = "1.0.160"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585"
|
||||
checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1104,9 +1108,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.32.0"
|
||||
version = "0.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0"
|
||||
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
@@ -1223,13 +1227,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.23.4"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
|
||||
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"tokio",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1443,16 +1446,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "pgcat"
|
||||
version = "1.0.0"
|
||||
version = "1.0.1"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = "0.32.0"
|
||||
sqlparser = "0.33.0"
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
env_logger = "0.10"
|
||||
@@ -28,7 +28,7 @@ hmac = "0.12"
|
||||
sha2 = "0.10"
|
||||
base64 = "0.21"
|
||||
stringprep = "0.1"
|
||||
tokio-rustls = "0.23"
|
||||
tokio-rustls = "0.24"
|
||||
rustls-pemfile = "1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
phf = { version = "0.11.1", features = ["macros"] }
|
||||
@@ -37,7 +37,7 @@ futures = "0.3"
|
||||
socket2 = { version = "0.4.7", features = ["all"] }
|
||||
nix = "0.26.2"
|
||||
atomic_enum = "0.2.0"
|
||||
postgres-protocol = "0.6.4"
|
||||
postgres-protocol = "0.6.5"
|
||||
fallible-iterator = "0.2"
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
|
||||
46
README.md
46
README.md
@@ -21,21 +21,53 @@ PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load bal
|
||||
| Client TLS | **Stable** | Clients can connect to the pooler using TLS/SSL. |
|
||||
| Client/Server authentication | **Stable** | Clients can connect using MD5 authentication, supported by `libpq` and all Postgres client drivers. PgCat can connect to Postgres using MD5 and SCRAM-SHA-256. |
|
||||
| Live configuration reloading | **Stable** | Identical to PgBouncer; all settings can be reloaded dynamically (except `host` and `port`). |
|
||||
| Auth passthrough | **Stable** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file.|
|
||||
| Sharding using extended SQL syntax | **Experimental** | Clients can dynamically configure the pooler to route queries to specific shards. |
|
||||
| Sharding using comments parsing/Regex | **Experimental** | Clients can include shard information (sharding key, shard ID) in the query comments. |
|
||||
| Automatic sharding | **Experimental** | PgCat can parse queries, detect sharding keys automatically, and route queries to the correct shard. |
|
||||
| Mirroring | **Experimental** | Mirror queries between multiple databases in order to test servers with realistic production traffic. |
|
||||
| Auth passthrough | **Experimental** | MD5 password authentication can be configured to use an `auth_query` so no cleartext passwords are needed in the config file. |
|
||||
|
||||
|
||||
## Status
|
||||
|
||||
PgCat is stable and used in production to serve hundreds of thousands of queries per second. Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
|
||||
PgCat is stable and used in production to serve hundreds of thousands of queries per second.
|
||||
|
||||
| | |
|
||||
|-|-|
|
||||
|<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f"><img src="./images/instacart.webp" height="70" width="auto"></a>|<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second"><img src="./images/postgresml.webp" height="70" width="auto"></a>|
|
||||
| [Instacart](https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f) | [PostgresML](https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second) |
|
||||
<table>
|
||||
<tr>
|
||||
<td>
|
||||
<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f">
|
||||
<img src="./images/instacart.webp" height="70" width="auto">
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
||||
<img src="./images/postgresml.webp" height="70" width="auto">
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
<a href="https://onesignal.com">
|
||||
<img src="./images/one_signal.webp" height="70" width="auto">
|
||||
</a>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<a href="https://tech.instacart.com/adopting-pgcat-a-nextgen-postgres-proxy-3cf284e68c2f">
|
||||
Instacart
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
||||
PostgresML
|
||||
</a>
|
||||
</td>
|
||||
<td>
|
||||
OneSignal
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
Some features remain experimental and are being actively developed. They are optional and can be enabled through configuration.
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -99,7 +131,7 @@ You can open a Docker development environment where you can debug tests easier.
|
||||
./dev/script/console
|
||||
```
|
||||
|
||||
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the contaner (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
|
||||
This will open a terminal in an environment similar to that used in tests. In there, you can compile the pooler, run tests, do some debugging with the test environment, etc. Objects compiled inside the container (and bundled gems) will be placed in `dev/cache` so they don't interfere with what you have on your machine.
|
||||
|
||||
## Usage
|
||||
|
||||
|
||||
@@ -38,9 +38,6 @@ log_client_connections = false
|
||||
# If we should log client disconnections
|
||||
log_client_disconnections = false
|
||||
|
||||
# Reload config automatically if it changes.
|
||||
autoreload = false
|
||||
|
||||
# TLS
|
||||
# tls_certificate = "server.cert"
|
||||
# tls_private_key = "server.key"
|
||||
@@ -76,7 +73,7 @@ query_parser_enabled = true
|
||||
|
||||
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||
# load balancing of read queries. Otherwise, the primary will only be used for write
|
||||
# queries. The primary can always be explicitely selected with our custom protocol.
|
||||
# queries. The primary can always be explicitly selected with our custom protocol.
|
||||
primary_reads_enabled = true
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
|
||||
BIN
images/one_signal.webp
Normal file
BIN
images/one_signal.webp
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 16 KiB |
38
pgcat.toml
38
pgcat.toml
@@ -45,7 +45,7 @@ log_client_connections = false
|
||||
log_client_disconnections = false
|
||||
|
||||
# When set to true, PgCat reloads configs if it detects a change in the config file.
|
||||
autoreload = false
|
||||
autoreload = 15000
|
||||
|
||||
# Number of worker threads the Runtime will use (4 by default).
|
||||
worker_threads = 5
|
||||
@@ -57,7 +57,7 @@ tcp_keepalives_count = 5
|
||||
# Number of seconds between keepalive packets.
|
||||
tcp_keepalives_interval = 5
|
||||
|
||||
# Path to TLS Certficate file to use for TLS connections
|
||||
# Path to TLS Certificate file to use for TLS connections
|
||||
# tls_certificate = "server.cert"
|
||||
# Path to TLS private key file to use for TLS connections
|
||||
# tls_private_key = "server.key"
|
||||
@@ -113,6 +113,21 @@ primary_reads_enabled = true
|
||||
# `sha1`: A hashing function based on SHA1
|
||||
sharding_function = "pg_bigint_hash"
|
||||
|
||||
# Query to be sent to servers to obtain the hash used for md5 authentication. The connection will be
|
||||
# established using the database configured in the pool. This parameter is inherited by every pool
|
||||
# and can be redefined in pool configuration.
|
||||
# auth_query = "SELECT $1"
|
||||
|
||||
# User to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
# specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
# This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
# auth_query_user = "sharding_user"
|
||||
|
||||
# Password to be used for connecting to servers to obtain the hash used for md5 authentication by sending the query
|
||||
# specified in `auth_query_user`. The connection will be established using the database configured in the pool.
|
||||
# This parameter is inherited by every pool and can be redefined in pool configuration.
|
||||
# auth_query_password = "sharding_user"
|
||||
|
||||
# Automatically parse this from queries and route queries to the right shard!
|
||||
# automatic_sharding_key = "data.id"
|
||||
|
||||
@@ -123,17 +138,30 @@ idle_timeout = 40000
|
||||
connect_timeout = 3000
|
||||
|
||||
# User configs are structured as pool.<pool_name>.users.<user_index>
|
||||
# This secion holds the credentials for users that may connect to this cluster
|
||||
# This section holds the credentials for users that may connect to this cluster
|
||||
[pools.sharded_db.users.0]
|
||||
# Postgresql username
|
||||
# PostgreSQL username used to authenticate the user and connect to the server
|
||||
# if `server_username` is not set.
|
||||
username = "sharding_user"
|
||||
# Postgresql password
|
||||
|
||||
# PostgreSQL password used to authenticate the user and connect to the server
|
||||
# if `server_password` is not set.
|
||||
password = "sharding_user"
|
||||
|
||||
pool_mode = "session"
|
||||
|
||||
# PostgreSQL username used to connect to the server.
|
||||
# server_username = "another_user"
|
||||
|
||||
# PostgreSQL password used to connect to the server.
|
||||
# server_password = "another_password"
|
||||
|
||||
# Maximum number of server connections that can be established for this user
|
||||
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
# is the sum of pool_size across all users.
|
||||
pool_size = 9
|
||||
|
||||
|
||||
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
# 0 means it is disabled.
|
||||
statement_timeout = 0
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::errors::Error;
|
||||
use crate::pool::ConnectionPool;
|
||||
use crate::server::Server;
|
||||
use log::debug;
|
||||
|
||||
@@ -71,25 +72,34 @@ impl AuthPassthrough {
|
||||
let auth_user = crate::config::User {
|
||||
username: self.user.clone(),
|
||||
password: Some(self.password.clone()),
|
||||
server_username: None,
|
||||
server_password: None,
|
||||
pool_size: 1,
|
||||
statement_timeout: 0,
|
||||
pool_mode: None,
|
||||
};
|
||||
|
||||
let user = &address.username;
|
||||
|
||||
debug!("Connecting to server to obtain auth hashes.");
|
||||
debug!("Connecting to server to obtain auth hashes");
|
||||
|
||||
let auth_query = self.query.replace("$1", user);
|
||||
|
||||
match Server::exec_simple_query(address, &auth_user, &auth_query).await {
|
||||
Ok(password_data) => {
|
||||
if password_data.len() == 2 && password_data.first().unwrap() == user {
|
||||
if let Some(stripped_hash) = password_data.last().unwrap().to_string().strip_prefix("md5") {
|
||||
Ok(stripped_hash.to_string())
|
||||
}
|
||||
else {
|
||||
Err(Error::AuthPassthroughError(
|
||||
"Obtained hash from auth_query does not seem to be in md5 format.".to_string(),
|
||||
))
|
||||
}
|
||||
if let Some(stripped_hash) = password_data
|
||||
.last()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.strip_prefix("md5") {
|
||||
Ok(stripped_hash.to_string())
|
||||
}
|
||||
else {
|
||||
Err(Error::AuthPassthroughError(
|
||||
"Obtained hash from auth_query does not seem to be in md5 format.".to_string(),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Err(Error::AuthPassthroughError(
|
||||
"Data obtained from query does not follow the scheme 'user','hash'."
|
||||
@@ -98,10 +108,25 @@ impl AuthPassthrough {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
Err(Error::AuthPassthroughError(
|
||||
format!("Error trying to obtain password from auth_query, ignoring hash for user '{}'. Error: {:?}",
|
||||
user, err)))
|
||||
Err(Error::AuthPassthroughError(
|
||||
format!("Error trying to obtain password from auth_query, ignoring hash for user '{}'. Error: {:?}",
|
||||
user, err))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn refetch_auth_hash(pool: &ConnectionPool) -> Result<String, Error> {
|
||||
let address = pool.address(0, 0);
|
||||
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
|
||||
let hash = apt.fetch_hash(address).await?;
|
||||
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
Err(Error::ClientError(format!(
|
||||
"Could not obtain hash for {{ username: {:?}, database: {:?} }}. Auth passthrough not enabled.",
|
||||
address.username, address.database
|
||||
)))
|
||||
}
|
||||
|
||||
201
src/client.rs
201
src/client.rs
@@ -1,4 +1,4 @@
|
||||
use crate::errors::Error;
|
||||
use crate::errors::{ClientIdentifier, Error};
|
||||
use crate::pool::BanReason;
|
||||
/// Handle clients by pretending to be a PostgreSQL server.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
@@ -12,7 +12,7 @@ use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::admin::{generate_server_info_for_admin, handle_admin};
|
||||
use crate::auth_passthrough::AuthPassthrough;
|
||||
use crate::auth_passthrough::refetch_auth_hash;
|
||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||
use crate::constants::*;
|
||||
use crate::messages::*;
|
||||
@@ -202,7 +202,7 @@ pub async fn client_entrypoint(
|
||||
// Client probably disconnected rejecting our plain text connection.
|
||||
Ok((ClientConnectionType::Tls, _))
|
||||
| Ok((ClientConnectionType::CancelQuery, _)) => Err(Error::ProtocolSyncError(
|
||||
format!("Bad postgres client (plain)"),
|
||||
"Bad postgres client (plain)".into(),
|
||||
)),
|
||||
|
||||
Err(err) => Err(err),
|
||||
@@ -369,28 +369,14 @@ pub async fn startup_tls(
|
||||
}
|
||||
|
||||
// Bad Postgres client.
|
||||
Ok((ClientConnectionType::Tls, _)) | Ok((ClientConnectionType::CancelQuery, _)) => Err(
|
||||
Error::ProtocolSyncError(format!("Bad postgres client (tls)")),
|
||||
),
|
||||
Ok((ClientConnectionType::Tls, _)) | Ok((ClientConnectionType::CancelQuery, _)) => {
|
||||
Err(Error::ProtocolSyncError("Bad postgres client (tls)".into()))
|
||||
}
|
||||
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn refetch_auth_hash(pool: &ConnectionPool) -> Result<String, Error> {
|
||||
let address = pool.address(0, 0);
|
||||
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
|
||||
let hash = apt.fetch_hash(address).await?;
|
||||
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
Err(Error::ClientError(format!(
|
||||
"Could not obtain hash for {{ username: {:?}, database: {:?} }}. Auth passthrough not enabled.",
|
||||
address.username, address.database
|
||||
)))
|
||||
}
|
||||
|
||||
impl<S, T> Client<S, T>
|
||||
where
|
||||
S: tokio::io::AsyncRead + std::marker::Unpin,
|
||||
@@ -418,7 +404,7 @@ where
|
||||
Some(user) => user,
|
||||
None => {
|
||||
return Err(Error::ClientError(
|
||||
"Missing user parameter on client startup".to_string(),
|
||||
"Missing user parameter on client startup".into(),
|
||||
))
|
||||
}
|
||||
};
|
||||
@@ -433,6 +419,8 @@ where
|
||||
None => "pgcat",
|
||||
};
|
||||
|
||||
let client_identifier = ClientIdentifier::new(&application_name, &username, &pool_name);
|
||||
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == pool_name)
|
||||
@@ -463,7 +451,12 @@ where
|
||||
|
||||
let code = match read.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading password code from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password code".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// PasswordMessage
|
||||
@@ -476,19 +469,30 @@ where
|
||||
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading password message length from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message length".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match read.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading password message from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
|
||||
Err(_) => {
|
||||
return Err(Error::ClientSocketError(
|
||||
"password message".into(),
|
||||
client_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Authenticate admin user.
|
||||
let (transaction_mode, server_info) = if admin {
|
||||
let config = get_config();
|
||||
|
||||
// Compare server and client hashes.
|
||||
let password_hash = md5_hash_password(
|
||||
&config.general.admin_username,
|
||||
@@ -497,10 +501,12 @@ where
|
||||
);
|
||||
|
||||
if password_hash != password_response {
|
||||
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name);
|
||||
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||
|
||||
warn!("{}", error);
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(Error::ClientError(format!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
(false, generate_server_info_for_admin())
|
||||
@@ -519,7 +525,10 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Err(Error::ClientError(format!("Invalid pool name {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
|
||||
return Err(Error::ClientGeneralError(
|
||||
"Invalid pool name".into(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -530,16 +539,23 @@ where
|
||||
Some(md5_hash_password(username, password, &salt))
|
||||
} else {
|
||||
if !get_config().is_auth_query_configured() {
|
||||
return Err(Error::ClientError(format!("Client auth not possible, no cleartext password set for username: {:?} in config and auth passthrough (query_auth) is not set up.", username)));
|
||||
return Err(Error::ClientAuthImpossible(username.into()));
|
||||
}
|
||||
|
||||
let mut hash = (*pool.auth_hash.read()).clone();
|
||||
|
||||
if hash.is_none() {
|
||||
warn!("Query auth configured but no hash password found for pool {}. Will try to refetch it.", pool_name);
|
||||
warn!(
|
||||
"Query auth configured \
|
||||
but no hash password found \
|
||||
for pool {}. Will try to refetch it.",
|
||||
pool_name
|
||||
);
|
||||
|
||||
match refetch_auth_hash(&pool).await {
|
||||
Ok(fetched_hash) => {
|
||||
warn!("Password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, obtained. Updating.", username, pool_name, application_name);
|
||||
warn!("Password for {}, obtained. Updating.", client_identifier);
|
||||
|
||||
{
|
||||
let mut pool_auth_hash = pool.auth_hash.write();
|
||||
*pool_auth_hash = Some(fetched_hash.clone());
|
||||
@@ -547,16 +563,12 @@ where
|
||||
|
||||
hash = Some(fetched_hash);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
return Err(
|
||||
Error::ClientError(
|
||||
format!("No cleartext password set, and no auth passthrough could not obtain the hash from server for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, the error was: {:?}",
|
||||
username,
|
||||
pool_name,
|
||||
application_name,
|
||||
err)
|
||||
)
|
||||
);
|
||||
return Err(Error::ClientAuthPassthroughError(
|
||||
err.to_string(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -570,20 +582,31 @@ where
|
||||
//
|
||||
// @TODO: we could end up fetching again the same password twice (see above).
|
||||
if password_hash.unwrap() != password_response {
|
||||
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, will try to refetch it.", username, pool_name, application_name);
|
||||
warn!(
|
||||
"Invalid password {}, will try to refetch it.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
let fetched_hash = refetch_auth_hash(&pool).await?;
|
||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||
|
||||
// Ok password changed in server an auth is possible.
|
||||
if new_password_hash == password_response {
|
||||
warn!("Password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, changed in server. Updating.", username, pool_name, application_name);
|
||||
warn!(
|
||||
"Password for {}, changed in server. Updating.",
|
||||
client_identifier
|
||||
);
|
||||
|
||||
{
|
||||
let mut pool_auth_hash = pool.auth_hash.write();
|
||||
*pool_auth_hash = Some(fetched_hash);
|
||||
}
|
||||
} else {
|
||||
wrong_password(&mut write, username).await?;
|
||||
return Err(Error::ClientError(format!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
|
||||
return Err(Error::ClientGeneralError(
|
||||
"Invalid password".into(),
|
||||
client_identifier,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -753,9 +776,9 @@ where
|
||||
&mut self.write,
|
||||
"terminating connection due to administrator command"
|
||||
).await?;
|
||||
self.stats.disconnect();
|
||||
|
||||
return Ok(())
|
||||
self.stats.disconnect();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Admin clients ignore shutdown.
|
||||
@@ -909,7 +932,7 @@ where
|
||||
}
|
||||
|
||||
// Grab a server from the pool.
|
||||
let connection = match pool
|
||||
let mut connection = match pool
|
||||
.get(query_router.shard(), query_router.role(), &self.stats)
|
||||
.await
|
||||
{
|
||||
@@ -928,18 +951,32 @@ where
|
||||
error!("Got Sync message but failed to get a connection from the pool");
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
||||
error_response(&mut self.write, "could not get connection from the pool")
|
||||
.await?;
|
||||
|
||||
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
||||
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
||||
error!(
|
||||
"Could not get connection from pool: \
|
||||
{{ \
|
||||
pool_name: {:?}, \
|
||||
username: {:?}, \
|
||||
shard: {:?}, \
|
||||
role: \"{:?}\", \
|
||||
error: \"{:?}\" \
|
||||
}}",
|
||||
self.pool_name,
|
||||
self.username,
|
||||
query_router.shard(),
|
||||
query_router.role(),
|
||||
err
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut reference = connection.0;
|
||||
let server = &mut *connection.0;
|
||||
let address = connection.1;
|
||||
let server = &mut *reference;
|
||||
|
||||
// Server is assigned to the client in case the client wants to
|
||||
// cancel a query later.
|
||||
@@ -962,6 +999,7 @@ where
|
||||
|
||||
// Set application_name.
|
||||
server.set_name(&self.application_name).await?;
|
||||
server.switch_async(false);
|
||||
|
||||
let mut initial_message = Some(message);
|
||||
|
||||
@@ -981,12 +1019,37 @@ where
|
||||
None => {
|
||||
trace!("Waiting for message inside transaction or in session mode");
|
||||
|
||||
match tokio::time::timeout(
|
||||
idle_client_timeout_duration,
|
||||
read_message(&mut self.read),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let message = tokio::select! {
|
||||
message = tokio::time::timeout(
|
||||
idle_client_timeout_duration,
|
||||
read_message(&mut self.read),
|
||||
) => message,
|
||||
|
||||
server_message = server.recv() => {
|
||||
debug!("Got async message");
|
||||
|
||||
let server_message = match server_message {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
pool.ban(&address, BanReason::MessageReceiveFailed, Some(&self.stats));
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match write_all_half(&mut self.write, &server_message).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match message {
|
||||
Ok(Ok(message)) => message,
|
||||
Ok(Err(err)) => {
|
||||
// Client disconnected inside a transaction.
|
||||
@@ -999,11 +1062,25 @@ where
|
||||
Err(_) => {
|
||||
// Client idle in transaction timeout
|
||||
error_response(&mut self.write, "idle transaction timeout").await?;
|
||||
error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role());
|
||||
error!(
|
||||
"Client idle in transaction timeout: \
|
||||
{{ \
|
||||
pool_name: {}, \
|
||||
username: {}, \
|
||||
shard: {}, \
|
||||
role: \"{:?}\" \
|
||||
}}",
|
||||
self.pool_name,
|
||||
self.username,
|
||||
query_router.shard(),
|
||||
query_router.role()
|
||||
);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(message) => {
|
||||
initial_message = None;
|
||||
message
|
||||
@@ -1076,6 +1153,11 @@ where
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Close the prepared statement.
|
||||
'C' => {
|
||||
self.buffer.put(&message[..]);
|
||||
}
|
||||
|
||||
// Execute
|
||||
// Execute a prepared statement prepared in `P` and bound in `B`.
|
||||
'E' => {
|
||||
@@ -1084,9 +1166,14 @@ where
|
||||
|
||||
// Sync
|
||||
// Frontend (client) is asking for the query result now.
|
||||
'S' => {
|
||||
'S' | 'H' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
if code == 'H' {
|
||||
server.switch_async(true);
|
||||
debug!("Client requested flush, going async");
|
||||
}
|
||||
|
||||
self.buffer.put(&message[..]);
|
||||
|
||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||
|
||||
@@ -178,7 +178,10 @@ impl Address {
|
||||
pub struct User {
|
||||
pub username: String,
|
||||
pub password: Option<String>,
|
||||
pub server_username: Option<String>,
|
||||
pub server_password: Option<String>,
|
||||
pub pool_size: u32,
|
||||
pub pool_mode: Option<PoolMode>,
|
||||
#[serde(default)] // 0
|
||||
pub statement_timeout: u64,
|
||||
}
|
||||
@@ -188,8 +191,11 @@ impl Default for User {
|
||||
User {
|
||||
username: String::from("postgres"),
|
||||
password: None,
|
||||
server_username: None,
|
||||
server_password: None,
|
||||
pool_size: 15,
|
||||
statement_timeout: 0,
|
||||
pool_mode: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -201,7 +207,7 @@ pub struct General {
|
||||
pub host: String,
|
||||
|
||||
#[serde(default = "General::default_port")]
|
||||
pub port: i16,
|
||||
pub port: u16,
|
||||
|
||||
pub enable_prometheus_exporter: Option<bool>,
|
||||
pub prometheus_exporter_port: i16,
|
||||
@@ -243,8 +249,8 @@ pub struct General {
|
||||
#[serde(default = "General::default_worker_threads")]
|
||||
pub worker_threads: usize,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub autoreload: bool,
|
||||
#[serde(default)] // None
|
||||
pub autoreload: Option<u64>,
|
||||
|
||||
pub tls_certificate: Option<String>,
|
||||
pub tls_private_key: Option<String>,
|
||||
@@ -261,7 +267,7 @@ impl General {
|
||||
"0.0.0.0".into()
|
||||
}
|
||||
|
||||
pub fn default_port() -> i16 {
|
||||
pub fn default_port() -> u16 {
|
||||
5432
|
||||
}
|
||||
|
||||
@@ -271,7 +277,7 @@ impl General {
|
||||
|
||||
// These keepalive defaults should detect a dead connection within 30 seconds.
|
||||
// Tokio defaults to disabling keepalives which keeps dead connections around indefinitely.
|
||||
// This can lead to permenant server pool exhaustion
|
||||
// This can lead to permanent server pool exhaustion
|
||||
pub fn default_tcp_keepalives_idle() -> u64 {
|
||||
5 // 5 seconds
|
||||
}
|
||||
@@ -333,7 +339,7 @@ impl Default for General {
|
||||
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
|
||||
log_client_connections: false,
|
||||
log_client_disconnections: false,
|
||||
autoreload: false,
|
||||
autoreload: None,
|
||||
tls_certificate: None,
|
||||
tls_private_key: None,
|
||||
admin_username: String::from("admin"),
|
||||
@@ -356,6 +362,7 @@ pub enum PoolMode {
|
||||
#[serde(alias = "session", alias = "Session")]
|
||||
Session,
|
||||
}
|
||||
|
||||
impl ToString for PoolMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
@@ -419,7 +426,7 @@ pub struct Pool {
|
||||
|
||||
pub shards: BTreeMap<String, Shard>,
|
||||
pub users: BTreeMap<String, User>,
|
||||
// Note, don't put simple fields below these configs. There's a compatability issue with TOML that makes it
|
||||
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
|
||||
// incompatible to have simple fields in TOML after complex objects. See
|
||||
// https://users.rust-lang.org/t/why-toml-to-string-get-error-valueaftertable/85903
|
||||
}
|
||||
@@ -816,8 +823,9 @@ impl Config {
|
||||
.to_string()
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Pool mode: {:?}",
|
||||
pool_name, pool_config.pool_mode
|
||||
"[pool: {}] Default pool mode: {}",
|
||||
pool_name,
|
||||
pool_config.pool_mode.to_string()
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Load Balancing mode: {:?}",
|
||||
@@ -868,7 +876,16 @@ impl Config {
|
||||
info!(
|
||||
"[pool: {}][user: {}] Statement timeout: {}",
|
||||
pool_name, user.1.username, user.1.statement_timeout
|
||||
)
|
||||
);
|
||||
info!(
|
||||
"[pool: {}][user: {}] Pool mode: {}",
|
||||
pool_name,
|
||||
user.1.username,
|
||||
match user.1.pool_mode {
|
||||
Some(pool_mode) => pool_mode.to_string(),
|
||||
None => pool_config.pool_mode.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
102
src/errors.rs
102
src/errors.rs
@@ -1,13 +1,19 @@
|
||||
/// Errors.
|
||||
//! Errors.
|
||||
|
||||
/// Various errors.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
SocketError(String),
|
||||
ClientSocketError(String, ClientIdentifier),
|
||||
ClientGeneralError(String, ClientIdentifier),
|
||||
ClientAuthImpossible(String),
|
||||
ClientAuthPassthroughError(String, ClientIdentifier),
|
||||
ClientBadStartup,
|
||||
ProtocolSyncError(String),
|
||||
BadQuery(String),
|
||||
ServerError,
|
||||
ServerStartupError(String, ServerIdentifier),
|
||||
ServerAuthError(String, ServerIdentifier),
|
||||
BadConfig,
|
||||
AllServersDown,
|
||||
ClientError(String),
|
||||
@@ -18,3 +24,97 @@ pub enum Error {
|
||||
AuthError(String),
|
||||
AuthPassthroughError(String),
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
pub struct ClientIdentifier {
|
||||
pub application_name: String,
|
||||
pub username: String,
|
||||
pub pool_name: String,
|
||||
}
|
||||
|
||||
impl ClientIdentifier {
|
||||
pub fn new(application_name: &str, username: &str, pool_name: &str) -> ClientIdentifier {
|
||||
ClientIdentifier {
|
||||
application_name: application_name.into(),
|
||||
username: username.into(),
|
||||
pool_name: pool_name.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ClientIdentifier {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{{ application_name: {}, username: {}, pool_name: {} }}",
|
||||
self.application_name, self.username, self.pool_name
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
pub struct ServerIdentifier {
|
||||
pub username: String,
|
||||
pub database: String,
|
||||
}
|
||||
|
||||
impl ServerIdentifier {
|
||||
pub fn new(username: &str, database: &str) -> ServerIdentifier {
|
||||
ServerIdentifier {
|
||||
username: username.into(),
|
||||
database: database.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ServerIdentifier {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{{ username: {}, database: {} }}",
|
||||
self.username, self.database
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match &self {
|
||||
&Error::ClientSocketError(error, client_identifier) => write!(
|
||||
f,
|
||||
"Error reading {} from client {}",
|
||||
error, client_identifier
|
||||
),
|
||||
&Error::ClientGeneralError(error, client_identifier) => {
|
||||
write!(f, "{} {}", error, client_identifier)
|
||||
}
|
||||
&Error::ClientAuthImpossible(username) => write!(
|
||||
f,
|
||||
"Client auth not possible, \
|
||||
no cleartext password set for username: {} \
|
||||
in config and auth passthrough (query_auth) \
|
||||
is not set up.",
|
||||
username
|
||||
),
|
||||
&Error::ClientAuthPassthroughError(error, client_identifier) => write!(
|
||||
f,
|
||||
"No cleartext password set, \
|
||||
and no auth passthrough could not \
|
||||
obtain the hash from server for {}, \
|
||||
the error was: {}",
|
||||
client_identifier, error
|
||||
),
|
||||
&Error::ServerStartupError(error, server_identifier) => write!(
|
||||
f,
|
||||
"Error reading {} on server startup {}",
|
||||
error, server_identifier,
|
||||
),
|
||||
&Error::ServerAuthError(error, server_identifier) => {
|
||||
write!(f, "{} for {}", error, server_identifier,)
|
||||
}
|
||||
|
||||
// The rest can use Debug.
|
||||
err => write!(f, "{:?}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
34
src/main.rs
34
src/main.rs
@@ -79,6 +79,7 @@ mod stats;
|
||||
mod tls;
|
||||
|
||||
use crate::config::{get_config, reload_config, VERSION};
|
||||
use crate::messages::configure_socket;
|
||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||
use crate::prometheus::start_metric_server;
|
||||
use crate::stats::{Collector, Reporter, REPORTER};
|
||||
@@ -179,16 +180,19 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
stats_collector.collect().await;
|
||||
});
|
||||
|
||||
info!("Config autoreloader: {}", config.general.autoreload);
|
||||
info!("Config autoreloader: {}", match config.general.autoreload {
|
||||
Some(interval) => format!("{} ms", interval),
|
||||
None => "disabled".into(),
|
||||
});
|
||||
|
||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||
let autoreload_client_server_map = client_server_map.clone();
|
||||
if let Some(interval) = config.general.autoreload {
|
||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(interval));
|
||||
let autoreload_client_server_map = client_server_map.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
autoreload_interval.tick().await;
|
||||
if config.general.autoreload {
|
||||
info!("Automatically reloading config");
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
autoreload_interval.tick().await;
|
||||
debug!("Automatically reloading config");
|
||||
|
||||
if let Ok(changed) = reload_config(autoreload_client_server_map.clone()).await {
|
||||
if changed {
|
||||
@@ -196,8 +200,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
||||
#[cfg(windows)]
|
||||
let mut term_signal = win_signal::ctrl_close().unwrap();
|
||||
@@ -282,7 +288,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let drain_tx = drain_tx.clone();
|
||||
let client_server_map = client_server_map.clone();
|
||||
|
||||
let tls_certificate = config.general.tls_certificate.clone();
|
||||
let tls_certificate = get_config().general.tls_certificate.clone();
|
||||
|
||||
configure_socket(&socket);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let start = chrono::offset::Utc::now().naive_utc();
|
||||
@@ -293,7 +301,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
shutdown_rx,
|
||||
drain_tx,
|
||||
admin_only,
|
||||
tls_certificate.clone(),
|
||||
tls_certificate,
|
||||
config.general.log_client_connections,
|
||||
)
|
||||
.await
|
||||
@@ -301,7 +309,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Ok(()) => {
|
||||
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||
|
||||
if config.general.log_client_disconnections {
|
||||
if get_config().general.log_client_disconnections {
|
||||
info!(
|
||||
"Client {:?} disconnected, session duration: {}",
|
||||
addr,
|
||||
|
||||
@@ -404,7 +404,7 @@ pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
||||
let mut res = BytesMut::new();
|
||||
let mut row_desc = BytesMut::new();
|
||||
|
||||
// how many colums we are storing
|
||||
// how many columns we are storing
|
||||
row_desc.put_i16(columns.len() as i16);
|
||||
|
||||
for (name, data_type) in columns {
|
||||
|
||||
@@ -17,7 +17,7 @@ use log::{Level, Log, Metadata, Record, SetLoggerError};
|
||||
//
|
||||
// So to summarize, if no `STDOUT_LOG` env var is present, the logger is the default logger. If `STDOUT_LOG` is set, everything
|
||||
// but errors, that matches the log level set in the `STDOUT_LOG` env var is sent to stdout. You can have also some esoteric configuration
|
||||
// where you set `RUST_LOG=debug` and `STDOUT_LOG=info`, in here, erros will go to stderr, warns and infos to stdout and debugs to stderr.
|
||||
// where you set `RUST_LOG=debug` and `STDOUT_LOG=info`, in here, errors will go to stderr, warns and infos to stdout and debugs to stderr.
|
||||
//
|
||||
pub struct MultiLogger {
|
||||
stderr_logger: env_logger::Logger,
|
||||
|
||||
11
src/pool.rs
11
src/pool.rs
@@ -382,7 +382,10 @@ impl ConnectionPool {
|
||||
server_info: Arc::new(RwLock::new(BytesMut::new())),
|
||||
auth_hash: pool_auth_hash,
|
||||
settings: PoolSettings {
|
||||
pool_mode: pool_config.pool_mode,
|
||||
pool_mode: match user.pool_mode {
|
||||
Some(pool_mode) => pool_mode,
|
||||
None => pool_config.pool_mode,
|
||||
},
|
||||
load_balancing_mode: pool_config.load_balancing_mode,
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
@@ -774,6 +777,7 @@ impl ConnectionPool {
|
||||
self.databases.len()
|
||||
}
|
||||
|
||||
/// Retrieve all bans for all servers.
|
||||
pub fn get_bans(&self) -> Vec<(Address, (BanReason, NaiveDateTime))> {
|
||||
let mut bans: Vec<(Address, (BanReason, NaiveDateTime))> = Vec::new();
|
||||
let guard = self.banlist.read();
|
||||
@@ -785,7 +789,7 @@ impl ConnectionPool {
|
||||
return bans;
|
||||
}
|
||||
|
||||
/// Get the address from the host url
|
||||
/// Get the address from the host url.
|
||||
pub fn get_addresses_from_host(&self, host: &str) -> Vec<Address> {
|
||||
let mut addresses = Vec::new();
|
||||
for shard in 0..self.shards() {
|
||||
@@ -824,10 +828,13 @@ impl ConnectionPool {
|
||||
&self.addresses[shard][server]
|
||||
}
|
||||
|
||||
/// Get server settings retrieved at connection setup.
|
||||
pub fn server_info(&self) -> BytesMut {
|
||||
self.server_info.read().clone()
|
||||
}
|
||||
|
||||
/// Calculate how many used connections in the pool
|
||||
/// for the given server.
|
||||
fn busy_connection_count(&self, address: &Address) -> u32 {
|
||||
let state = self.pool_state(address.shard, address.address_index);
|
||||
let idle = state.idle_connections;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/// Route queries automatically based on explicitely requested
|
||||
/// Route queries automatically based on explicitly requested
|
||||
/// or implied query characteristics.
|
||||
use bytes::{Buf, BytesMut};
|
||||
use log::{debug, error};
|
||||
|
||||
167
src/server.rs
167
src/server.rs
@@ -17,7 +17,7 @@ use tokio::net::{
|
||||
|
||||
use crate::config::{Address, User};
|
||||
use crate::constants::*;
|
||||
use crate::errors::Error;
|
||||
use crate::errors::{Error, ServerIdentifier};
|
||||
use crate::messages::*;
|
||||
use crate::mirrors::MirroringManager;
|
||||
use crate::pool::ClientServerMap;
|
||||
@@ -38,6 +38,7 @@ pub struct Server {
|
||||
|
||||
/// Our server response buffer. We buffer data before we give it to the client.
|
||||
buffer: BytesMut,
|
||||
is_async: bool,
|
||||
|
||||
/// Server information the server sent us over on startup.
|
||||
server_info: BytesMut,
|
||||
@@ -103,28 +104,52 @@ impl Server {
|
||||
trace!("Sending StartupMessage");
|
||||
|
||||
// StartupMessage
|
||||
startup(&mut stream, &user.username, database).await?;
|
||||
let username = match user.server_username {
|
||||
Some(ref server_username) => server_username,
|
||||
None => &user.username,
|
||||
};
|
||||
|
||||
let password = match user.server_password {
|
||||
Some(ref server_password) => Some(server_password),
|
||||
None => match user.password {
|
||||
Some(ref password) => Some(password),
|
||||
None => None,
|
||||
},
|
||||
};
|
||||
|
||||
startup(&mut stream, username, database).await?;
|
||||
|
||||
let mut server_info = BytesMut::new();
|
||||
let mut process_id: i32 = 0;
|
||||
let mut secret_key: i32 = 0;
|
||||
let server_identifier = ServerIdentifier::new(username, &database);
|
||||
|
||||
// We'll be handling multiple packets, but they will all be structured the same.
|
||||
// We'll loop here until this exchange is complete.
|
||||
let mut scram: Option<ScramSha256> = None;
|
||||
if let Some(password) = &user.password.clone() {
|
||||
scram = Some(ScramSha256::new(password));
|
||||
}
|
||||
let mut scram: Option<ScramSha256> = match password {
|
||||
Some(password) => Some(ScramSha256::new(password)),
|
||||
None => None,
|
||||
};
|
||||
|
||||
loop {
|
||||
let code = match stream.read_u8().await {
|
||||
Ok(code) => code as char,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading message code on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"message code".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading message len on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"message len".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Message: {}", code);
|
||||
@@ -135,7 +160,12 @@ impl Server {
|
||||
// Determine which kind of authentication is required, if any.
|
||||
let auth_code = match stream.read_i32().await {
|
||||
Ok(auth_code) => auth_code,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading auth code on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"auth code".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Auth: {}", auth_code);
|
||||
@@ -148,14 +178,18 @@ impl Server {
|
||||
|
||||
match stream.read_exact(&mut salt).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading salt on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"salt".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
match &user.password {
|
||||
match password {
|
||||
// Using plaintext password
|
||||
Some(password) => {
|
||||
md5_password(&mut stream, &user.username, password, &salt[..])
|
||||
.await?
|
||||
md5_password(&mut stream, username, password, &salt[..]).await?
|
||||
}
|
||||
|
||||
// Using auth passthrough, in this case we should already have a
|
||||
@@ -171,8 +205,12 @@ impl Server {
|
||||
&salt[..],
|
||||
)
|
||||
.await?,
|
||||
None =>
|
||||
return Err(Error::AuthError(format!("Auth passthrough (auth_query) failed and no user password is set in cleartext for {{ username: {:?}, database: {:?} }}", user.username, database)))
|
||||
None => return Err(
|
||||
Error::ServerAuthError(
|
||||
"Auth passthrough (auth_query) failed and no user password is set in cleartext".into(),
|
||||
server_identifier
|
||||
)
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -182,16 +220,28 @@ impl Server {
|
||||
|
||||
SASL => {
|
||||
if scram.is_none() {
|
||||
return Err(Error::AuthError(format!("SASL auth required and not password specified, auth passthrough (auth_query) method is currently unsupported for SASL auth {{ username: {:?}, database: {:?} }}", user.username, database)));
|
||||
return Err(Error::ServerAuthError(
|
||||
"SASL auth required and no password specified. \
|
||||
Auth passthrough (auth_query) method is currently \
|
||||
unsupported for SASL auth"
|
||||
.into(),
|
||||
server_identifier,
|
||||
));
|
||||
}
|
||||
|
||||
debug!("Starting SASL authentication");
|
||||
|
||||
let sasl_len = (len - 8) as usize;
|
||||
let mut sasl_auth = vec![0u8; sasl_len];
|
||||
|
||||
match stream.read_exact(&mut sasl_auth).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading sasl message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"sasl message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
|
||||
@@ -233,7 +283,12 @@ impl Server {
|
||||
|
||||
match stream.read_exact(&mut sasl_data).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading sasl cont message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"sasl cont message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let msg = BytesMut::from(&sasl_data[..]);
|
||||
@@ -254,7 +309,12 @@ impl Server {
|
||||
let mut sasl_final = vec![0u8; len as usize - 8];
|
||||
match stream.read_exact(&mut sasl_final).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading sasl final message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"sasl final message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
match scram
|
||||
@@ -284,7 +344,12 @@ impl Server {
|
||||
'E' => {
|
||||
let error_code = match stream.read_u8().await {
|
||||
Ok(error_code) => error_code,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading error code message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"error code message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Error: {}", error_code);
|
||||
@@ -300,7 +365,12 @@ impl Server {
|
||||
|
||||
match stream.read_exact(&mut error).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading error message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"error message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: the error message contains multiple fields; we can decode them and
|
||||
@@ -319,7 +389,12 @@ impl Server {
|
||||
|
||||
match stream.read_exact(&mut param).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading parameter status message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"parameter status message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Save the parameter so we can pass it to the client later.
|
||||
@@ -336,12 +411,22 @@ impl Server {
|
||||
// See: <https://www.postgresql.org/docs/12/protocol-message-formats.html>.
|
||||
process_id = match stream.read_i32().await {
|
||||
Ok(id) => id,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading process id message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"process id message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
secret_key = match stream.read_i32().await {
|
||||
Ok(id) => id,
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading secret key message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"secret key message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -351,7 +436,12 @@ impl Server {
|
||||
|
||||
match stream.read_exact(&mut idle).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError(format!("Error reading transaction status message on server startup {{ username: {:?}, database: {:?} }}", user.username, database))),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
"transaction status message".into(),
|
||||
server_identifier,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let (read, write) = stream.into_split();
|
||||
@@ -361,6 +451,7 @@ impl Server {
|
||||
read: BufReader::new(read),
|
||||
write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
is_async: false,
|
||||
server_info,
|
||||
process_id,
|
||||
secret_key,
|
||||
@@ -413,7 +504,7 @@ impl Server {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
error!("Could not connect to server: {}", err);
|
||||
return Err(Error::SocketError(format!("Error reading cancel message")));
|
||||
return Err(Error::SocketError("Error reading cancel message".into()));
|
||||
}
|
||||
};
|
||||
configure_socket(&stream);
|
||||
@@ -448,6 +539,16 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
/// Switch to async mode, flushing messages as soon
|
||||
/// as we receive them without buffering or waiting for "ReadyForQuery".
|
||||
pub fn switch_async(&mut self, on: bool) {
|
||||
if on {
|
||||
self.is_async = true;
|
||||
} else {
|
||||
self.is_async = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive data from the server in response to a client request.
|
||||
/// This method must be called multiple times while `self.is_data_available()` is true
|
||||
/// in order to receive all data the server has to offer.
|
||||
@@ -543,7 +644,10 @@ impl Server {
|
||||
// DataRow
|
||||
'D' => {
|
||||
// More data is available after this message, this is not the end of the reply.
|
||||
self.data_available = true;
|
||||
// If we're async, flush to client now.
|
||||
if !self.is_async {
|
||||
self.data_available = true;
|
||||
}
|
||||
|
||||
// Don't flush yet, the more we buffer, the faster this goes...up to a limit.
|
||||
if self.buffer.len() >= 8196 {
|
||||
@@ -556,7 +660,10 @@ impl Server {
|
||||
|
||||
// CopyOutResponse: copy is starting from the server to the client.
|
||||
'H' => {
|
||||
self.data_available = true;
|
||||
// If we're in async mode, flush now.
|
||||
if !self.is_async {
|
||||
self.data_available = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -576,6 +683,10 @@ impl Server {
|
||||
// Keep buffering until ReadyForQuery shows up.
|
||||
_ => (),
|
||||
};
|
||||
|
||||
if self.is_async {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let bytes = self.buffer.clone();
|
||||
|
||||
@@ -66,7 +66,7 @@ impl Reporter {
|
||||
CLIENT_STATS.write().insert(client_id, stats);
|
||||
}
|
||||
|
||||
/// Reports a client is disconecting from the pooler.
|
||||
/// Reports a client is disconnecting from the pooler.
|
||||
fn client_disconnecting(&self, client_id: i32) {
|
||||
CLIENT_STATS.write().remove(&client_id);
|
||||
}
|
||||
@@ -76,7 +76,7 @@ impl Reporter {
|
||||
fn server_register(&self, server_id: i32, stats: Arc<ServerStats>) {
|
||||
SERVER_STATS.write().insert(server_id, stats);
|
||||
}
|
||||
/// Reports a server connection is disconecting from the pooler.
|
||||
/// Reports a server connection is disconnecting from the pooler.
|
||||
fn server_disconnecting(&self, server_id: i32) {
|
||||
SERVER_STATS.write().remove(&server_id);
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ impl ClientStats {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reports a client is disconecting from the pooler and
|
||||
/// Reports a client is disconnecting from the pooler and
|
||||
/// update metrics on the corresponding pool.
|
||||
pub fn disconnect(&self) {
|
||||
self.reporter.client_disconnecting(self.client_id);
|
||||
@@ -140,7 +140,7 @@ impl ClientStats {
|
||||
self.error_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Reportes the time spent by a client waiting to get a healthy connection from the pool
|
||||
/// Reporters the time spent by a client waiting to get a healthy connection from the pool
|
||||
pub fn checkout_time(&self, microseconds: u64) {
|
||||
self.total_wait_time
|
||||
.fetch_add(microseconds, Ordering::Relaxed);
|
||||
|
||||
@@ -100,10 +100,9 @@ impl ServerStats {
|
||||
.server_idle(self.state.load(Ordering::Relaxed));
|
||||
|
||||
self.state.store(ServerState::Idle, Ordering::Relaxed);
|
||||
self.set_undefined_application();
|
||||
}
|
||||
|
||||
/// Reports a server connection is disconecting from the pooler.
|
||||
/// Reports a server connection is disconnecting from the pooler.
|
||||
/// Also updates metrics on the pool regarding server usage.
|
||||
pub fn disconnect(&self) {
|
||||
self.reporter.server_disconnecting(self.server_id);
|
||||
|
||||
60
tests/python/async_test.py
Normal file
60
tests/python/async_test.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import psycopg2
|
||||
import asyncio
|
||||
import asyncpg
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
|
||||
def regular_main():
|
||||
# Connect to the PostgreSQL database
|
||||
conn = psycopg2.connect(
|
||||
host=PGCAT_HOST,
|
||||
database="sharded_db",
|
||||
user="sharding_user",
|
||||
password="sharding_user",
|
||||
port=PGCAT_PORT,
|
||||
)
|
||||
|
||||
# Open a cursor to perform database operations
|
||||
cur = conn.cursor()
|
||||
|
||||
# Execute a SQL query
|
||||
cur.execute("SELECT 1")
|
||||
|
||||
# Fetch the results
|
||||
rows = cur.fetchall()
|
||||
|
||||
# Print the results
|
||||
for row in rows:
|
||||
print(row[0])
|
||||
|
||||
# Close the cursor and the database connection
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
async def main():
|
||||
# Connect to the PostgreSQL database
|
||||
conn = await asyncpg.connect(
|
||||
host=PGCAT_HOST,
|
||||
database="sharded_db",
|
||||
user="sharding_user",
|
||||
password="sharding_user",
|
||||
port=PGCAT_PORT,
|
||||
)
|
||||
|
||||
# Execute a SQL query
|
||||
for _ in range(25):
|
||||
rows = await conn.fetch("SELECT 1")
|
||||
|
||||
# Print the results
|
||||
for row in rows:
|
||||
print(row[0])
|
||||
|
||||
# Close the database connection
|
||||
await conn.close()
|
||||
|
||||
|
||||
regular_main()
|
||||
asyncio.run(main())
|
||||
@@ -1,2 +1,11 @@
|
||||
asyncio==3.4.3
|
||||
asyncpg==0.27.0
|
||||
black==23.3.0
|
||||
click==8.1.3
|
||||
mypy-extensions==1.0.0
|
||||
packaging==23.1
|
||||
pathspec==0.11.1
|
||||
platformdirs==3.2.0
|
||||
psutil==5.9.1
|
||||
psycopg2==2.9.3
|
||||
psutil==5.9.1
|
||||
tomli==2.0.1
|
||||
|
||||
@@ -37,9 +37,9 @@ describe "Admin" do
|
||||
describe "SHOW POOLS" do
|
||||
context "bad credentials" do
|
||||
it "does not change any stats" do
|
||||
bad_passsword_url = URI(pgcat_conn_str)
|
||||
bad_passsword_url.password = "wrong"
|
||||
expect { PG::connect("#{bad_passsword_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
|
||||
bad_password_url = URI(pgcat_conn_str)
|
||||
bad_password_url.password = "wrong"
|
||||
expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
|
||||
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
|
||||
259
tests/ruby/helpers/pg_socket.rb
Normal file
259
tests/ruby/helpers/pg_socket.rb
Normal file
@@ -0,0 +1,259 @@
|
||||
require 'socket'
|
||||
require 'digest/md5'
|
||||
|
||||
BACKEND_MESSAGE_CODES = {
|
||||
'Z' => "ReadyForQuery",
|
||||
'C' => "CommandComplete",
|
||||
'T' => "RowDescription",
|
||||
'D' => "DataRow",
|
||||
'1' => "ParseComplete",
|
||||
'2' => "BindComplete",
|
||||
'E' => "ErrorResponse",
|
||||
's' => "PortalSuspended",
|
||||
}
|
||||
|
||||
class PostgresSocket
|
||||
def initialize(host, port)
|
||||
@port = port
|
||||
@host = host
|
||||
@socket = TCPSocket.new @host, @port
|
||||
@parameters = {}
|
||||
@verbose = true
|
||||
end
|
||||
|
||||
def send_md5_password_message(username, password, salt)
|
||||
m = Digest::MD5.hexdigest(password + username)
|
||||
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
|
||||
m = 'md5' + m
|
||||
bytes = (m.split("").map(&:ord) + [0]).flatten
|
||||
message_size = bytes.count + 4
|
||||
|
||||
message = []
|
||||
|
||||
message << 'p'.ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << bytes
|
||||
message.flatten!
|
||||
|
||||
|
||||
@socket.write(message.pack('C*'))
|
||||
end
|
||||
|
||||
def send_startup_message(username, database, password)
|
||||
message = []
|
||||
|
||||
message << [196608].pack('l>').unpack('CCCC') # 4
|
||||
message << "user".split('').map(&:ord) # 4, 8
|
||||
message << 0 # 1, 9
|
||||
message << username.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message << "database".split('').map(&:ord) # 8, 20
|
||||
message << 0 # 1, 21
|
||||
message << database.split('').map(&:ord) # 2, 23
|
||||
message << 0 # 1, 24
|
||||
message << 0 # 1, 25
|
||||
message.flatten!
|
||||
|
||||
total_message_size = message.size + 4
|
||||
|
||||
message_len = [total_message_size].pack('l>').unpack('CCCC')
|
||||
|
||||
@socket.write([message_len + message].flatten.pack('C*'))
|
||||
|
||||
sleep 0.1
|
||||
|
||||
read_startup_response(username, password)
|
||||
end
|
||||
|
||||
def read_startup_response(username, password)
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
while message_code == 'R'
|
||||
auth_code = @socket.recv(4).unpack('l>').pop
|
||||
case auth_code
|
||||
when 5 # md5
|
||||
salt = @socket.recv(4).unpack('CCCC')
|
||||
send_md5_password_message(username, password, salt)
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
when 0 # trust
|
||||
break
|
||||
end
|
||||
end
|
||||
loop do
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
if message_code == 'Z'
|
||||
@socket.recv(1).unpack("a") # most likely I
|
||||
break # We are good to go
|
||||
end
|
||||
if message_code == 'S'
|
||||
actual_message = @socket.recv(message_len - 4).unpack("C*")
|
||||
k,v = actual_message.pack('U*').split(/\x00/)
|
||||
@parameters[k] = v
|
||||
end
|
||||
if message_code == 'K'
|
||||
process_id, secret_key = @socket.recv(message_len - 4).unpack("l>l>")
|
||||
@parameters["process_id"] = process_id
|
||||
@parameters["secret_key"] = secret_key
|
||||
end
|
||||
end
|
||||
return @parameters
|
||||
end
|
||||
|
||||
def cancel_query
|
||||
socket = TCPSocket.new @host, @port
|
||||
process_key = @parameters["process_id"]
|
||||
secret_key = @parameters["secret_key"]
|
||||
message = []
|
||||
message << [16].pack('l>').unpack('CCCC') # 4
|
||||
message << [80877102].pack('l>').unpack('CCCC') # 4
|
||||
message << [process_key.to_i].pack('l>').unpack('CCCC') # 4
|
||||
message << [secret_key.to_i].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
socket.write(message.flatten.pack('C*'))
|
||||
socket.close
|
||||
log "[F] Sent CancelRequest message"
|
||||
end
|
||||
|
||||
def send_query_message(query)
|
||||
query_size = query.length
|
||||
message_size = 1 + 4 + query_size
|
||||
message = []
|
||||
message << "Q".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent Q message (#{query})"
|
||||
end
|
||||
|
||||
def send_parse_message(query)
|
||||
query_size = query.length
|
||||
message_size = 2 + 2 + 4 + query_size
|
||||
message = []
|
||||
message << "P".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message << [0, 0]
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent P message (#{query})"
|
||||
end
|
||||
|
||||
def send_bind_message
|
||||
message = []
|
||||
message << "B".ord
|
||||
message << [12].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << 0 # unnamed statement
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent B message"
|
||||
end
|
||||
|
||||
def send_describe_message(mode)
|
||||
message = []
|
||||
message << "D".ord
|
||||
message << [6].pack('l>').unpack('CCCC') # 4
|
||||
message << mode.ord
|
||||
message << 0 # unnamed statement
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent D message"
|
||||
end
|
||||
|
||||
def send_execute_message(limit=0)
|
||||
message = []
|
||||
message << "E".ord
|
||||
message << [9].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << [limit].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent E message"
|
||||
end
|
||||
|
||||
def send_sync_message
|
||||
message = []
|
||||
message << "S".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent S message"
|
||||
end
|
||||
|
||||
def send_copydone_message
|
||||
message = []
|
||||
message << "c".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent c message"
|
||||
end
|
||||
|
||||
def send_copyfail_message
|
||||
message = []
|
||||
message << "f".ord
|
||||
message << [5].pack('l>').unpack('CCCC') # 4
|
||||
message << 0
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent f message"
|
||||
end
|
||||
|
||||
def send_flush_message
|
||||
message = []
|
||||
message << "H".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent H message"
|
||||
end
|
||||
|
||||
def read_from_server()
|
||||
output_messages = []
|
||||
retry_count = 0
|
||||
message_code = nil
|
||||
message_len = 0
|
||||
loop do
|
||||
begin
|
||||
message_code, message_len = @socket.recv_nonblock(5).unpack("al>")
|
||||
rescue IO::WaitReadable
|
||||
return output_messages if retry_count > 50
|
||||
|
||||
retry_count += 1
|
||||
sleep(0.01)
|
||||
next
|
||||
end
|
||||
message = {
|
||||
code: message_code,
|
||||
len: message_len,
|
||||
bytes: []
|
||||
}
|
||||
log "[B] #{BACKEND_MESSAGE_CODES[message_code] || ('UnknownMessage(' + message_code + ')')}"
|
||||
|
||||
actual_message_length = message_len - 4
|
||||
if actual_message_length > 0
|
||||
message[:bytes] = @socket.recv(message_len - 4).unpack("C*")
|
||||
log "\t#{message[:bytes].join(",")}"
|
||||
log "\t#{message[:bytes].map(&:chr).join(" ")}"
|
||||
end
|
||||
output_messages << message
|
||||
return output_messages if message_code == 'Z'
|
||||
end
|
||||
end
|
||||
|
||||
def log(msg)
|
||||
return unless @verbose
|
||||
|
||||
puts msg
|
||||
end
|
||||
|
||||
def close
|
||||
@socket.close
|
||||
end
|
||||
end
|
||||
@@ -2,6 +2,7 @@ require 'json'
|
||||
require 'ostruct'
|
||||
require_relative 'pgcat_process'
|
||||
require_relative 'pg_instance'
|
||||
require_relative 'pg_socket'
|
||||
|
||||
class ::Hash
|
||||
def deep_merge(second)
|
||||
|
||||
@@ -65,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context "under homogenous load" do
|
||||
context "under homogeneous load" do
|
||||
it "balances query volume between all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
|
||||
155
tests/ruby/protocol_spec.rb
Normal file
155
tests/ruby/protocol_spec.rb
Normal file
@@ -0,0 +1,155 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
|
||||
describe "Portocol handling" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "session") }
|
||||
let(:sequence) { [] }
|
||||
let(:pgcat_socket) { PostgresSocket.new('localhost', processes.pgcat.port) }
|
||||
let(:pgdb_socket) { PostgresSocket.new('localhost', processes.all_databases.first.port) }
|
||||
|
||||
after do
|
||||
pgdb_socket.close
|
||||
pgcat_socket.close
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
def run_comparison(sequence, socket_a, socket_b)
|
||||
sequence.each do |msg, *args|
|
||||
socket_a.send(msg, *args)
|
||||
socket_b.send(msg, *args)
|
||||
|
||||
compare_messages(
|
||||
socket_a.read_from_server,
|
||||
socket_b.read_from_server
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def compare_messages(msg_arr0, msg_arr1)
|
||||
if msg_arr0.count != msg_arr1.count
|
||||
error_output = []
|
||||
|
||||
error_output << "#{msg_arr0.count} : #{msg_arr1.count}"
|
||||
error_output << "PgCat Messages"
|
||||
error_output += msg_arr0.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
|
||||
error_output << "PgServer Messages"
|
||||
error_output += msg_arr1.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
|
||||
error_desc = error_output.join("\n")
|
||||
raise StandardError, "Message count mismatch #{error_desc}"
|
||||
end
|
||||
|
||||
(0..msg_arr0.count - 1).all? do |i|
|
||||
msg0 = msg_arr0[i]
|
||||
msg1 = msg_arr1[i]
|
||||
|
||||
result = [
|
||||
msg0[:code] == msg1[:code],
|
||||
msg0[:len] == msg1[:len],
|
||||
msg0[:bytes] == msg1[:bytes],
|
||||
].all?
|
||||
|
||||
next result if result
|
||||
|
||||
if result == false
|
||||
error_string = []
|
||||
if msg0[:code] != msg1[:code]
|
||||
error_string << "code #{msg0[:code]} != #{msg1[:code]}"
|
||||
end
|
||||
if msg0[:len] != msg1[:len]
|
||||
error_string << "len #{msg0[:len]} != #{msg1[:len]}"
|
||||
end
|
||||
if msg0[:bytes] != msg1[:bytes]
|
||||
error_string << "bytes #{msg0[:bytes]} != #{msg1[:bytes]}"
|
||||
end
|
||||
err = error_string.join("\n")
|
||||
|
||||
raise StandardError, "Message mismatch #{err}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
RSpec.shared_examples "at parity with database" do
|
||||
before do
|
||||
pgcat_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
pgdb_socket.send_startup_message("sharding_user", "shard0", "sharding_user")
|
||||
end
|
||||
|
||||
it "works" do
|
||||
run_comparison(sequence, pgcat_socket, pgdb_socket)
|
||||
end
|
||||
end
|
||||
|
||||
context "Cancel Query" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_query_message, "SELECT pg_sleep(5)"],
|
||||
[:cancel_query]
|
||||
]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Simple query after parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 5"],
|
||||
[:send_query_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
]
|
||||
}
|
||||
|
||||
# Known to fail due to PgCat not supporting flush
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Flush message" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_flush_message]
|
||||
]
|
||||
}
|
||||
|
||||
# Known to fail due to PgCat not supporting flush
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Bind without parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_bind_message]
|
||||
]
|
||||
}
|
||||
# This is known to fail.
|
||||
# Server responds immediately, Proxy buffers the message
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
context "Simple message" do
|
||||
let(:sequence) {
|
||||
[[:send_query_message, "SELECT 1"]]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
context "Extended protocol" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
end
|
||||
@@ -27,7 +27,7 @@ describe "Sharding" do
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "automatic routing of extended procotol" do
|
||||
describe "automatic routing of extended protocol" do
|
||||
it "can do it" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.exec("SET SERVER ROLE TO 'auto'")
|
||||
|
||||
1
utilities/requirements.txt
Normal file
1
utilities/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
tomli
|
||||
Reference in New Issue
Block a user