mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
@@ -32,6 +32,9 @@ ban_time = 60 # Seconds
|
||||
#
|
||||
autoreload = true
|
||||
|
||||
tls_certificate = ".circleci/server.cert"
|
||||
tls_private_key = ".circleci/server.key"
|
||||
|
||||
#
|
||||
# User to use for authentication against the server.
|
||||
[user]
|
||||
|
||||
21
.circleci/server.cert
Normal file
21
.circleci/server.cert
Normal file
@@ -0,0 +1,21 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDazCCAlOgAwIBAgIUChIvUGFJGJe5EDch32rchqoxER0wDQYJKoZIhvcNAQEL
|
||||
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
|
||||
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMjA2MjcyMjI2MDZaFw0yMjA3
|
||||
MjcyMjI2MDZaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
|
||||
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQDdTwrBzV1v79faVckFvIn/9V4fypYs4vDi3X+h3wGn
|
||||
AjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzSioOv5zsOAvObwrnzbtKSwfs3aP5g
|
||||
eEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwvGn3A+SVCLyPMTNwnem48+ONh2F9u
|
||||
FHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHYM5Hx9tzc+NVYdERPtaVcX8ycw1Eh
|
||||
9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe72loreiwrECUOLAnAfp9Xqc+rMPP
|
||||
aLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE9nQvksjnAgMBAAGjUzBRMB0GA1Ud
|
||||
DgQWBBQLDtzexqjx7xPtUZuZB/angU9oSDAfBgNVHSMEGDAWgBQLDtzexqjx7xPt
|
||||
UZuZB/angU9oSDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQC/
|
||||
mxY/a/WeLENVj2Gg9EUH0CKzfqeTey1mb6YfPGxzrD7oq1m0Vn2MmTbjZrJgh/Ob
|
||||
QckO3ElF4kC9+6XP+iDPmabGpjeLgllBboT5l2aqnD1syMrf61WPLzgRzRfplYGy
|
||||
cjBQDDKPu8Lu0QRMWU28tHYN0bMxJoCuXysGGX5WsuFnKCA6f/V+nycJJXxJH3eB
|
||||
eLjTueD9/RE3OXhi6m8A29Q1E9AE5EF4uRxYXrr91BmYnk4aFvSmBxhUEzE12eSN
|
||||
lHB/uSc0+Dp+UVmVr6wW8AQfd16UBA0BUf3kSW3aSvirYPYH0rXiOOpEJgOwOMnR
|
||||
f5+XAbN1Y+3OsFz/ZmP9
|
||||
-----END CERTIFICATE-----
|
||||
28
.circleci/server.key
Normal file
28
.circleci/server.key
Normal file
@@ -0,0 +1,28 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDdTwrBzV1v79fa
|
||||
VckFvIn/9V4fypYs4vDi3X+h3wGnAjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzS
|
||||
ioOv5zsOAvObwrnzbtKSwfs3aP5geEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwv
|
||||
Gn3A+SVCLyPMTNwnem48+ONh2F9uFHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHY
|
||||
M5Hx9tzc+NVYdERPtaVcX8ycw1Eh9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe
|
||||
72loreiwrECUOLAnAfp9Xqc+rMPPaLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE
|
||||
9nQvksjnAgMBAAECggEAbnvddO9frFhivJ+DIhgEFQKcIOb0nigV9kx6QYehvYy8
|
||||
lp/+aMb0Lk7d9r8rFQdL/icMK5GwZALg2KNKJvEbbF1Q3PwT9VHoUlgBYKJMDEFA
|
||||
e9GKu7ASuVBjTZzdUUItwkkbe5eS/aQGeSWSjlpTnX0HNCFS72qRymK+scRhsAQf
|
||||
ZoHyZHDslkvPR3Pos+sndWBYCDHag5/KoPhsMt1+5S9NQcOUHx9Ac0gLHjau3N+P
|
||||
0FhODHFFGnnpyQvLvj6u3ZOR34ladMgoBglE0O3vPFhckn92EK4teeTWOsUMotiz
|
||||
qM3QIJTOJjtiY6VDGY93bIa4pFvt7Zi4vIerenKt0QKBgQD/UMFqfevTAMrk10AC
|
||||
bOa4+cM07ORY4ZwVj5ILhZn+8crDEEtBsUyuEU2FTINtnoEq1yGc/IXpsyS1BHjL
|
||||
L1xSml5LN3jInbi8z5XQfY5Sj3VOMtwY6yD20jcdeDC44rz3nStXdkcMWxbTMapx
|
||||
iOPsap5ciUKOMS7LyMidPEG/LQKBgQDd5vHgrLN0FBIIm+vZg6MEm4QyobstVp4l
|
||||
7V/GZsdL+M8AQv1Rx+5wSUSWKomOIv5lglis7f6g0c9O7Qkr78/wzoyoKC2RRqPp
|
||||
I90GjY2Iv22N4GIkRrDAgMZbkTitzIB6tbXEVeLAOh3frFJ8IwauRCOiXIjrZdJ4
|
||||
FvV86+nU4wKBgQDdWTP2kWkMrBk7QOp7r9Jv+AmnLuHhtOdPQgOJ/bA++X2ik9PL
|
||||
Bl3GY7XjpSwks1CkxZKcucmXjPp7/X6EGXFfI/owF82dkDADca0e7lufdERtIWb0
|
||||
K5WOpz2lTPhgsiLGQfq7fw2lxqsJOnvcpqOD6gOVkmKjSDyb7F0RBJazmQKBgQDD
|
||||
a8PQTcesjpBjLI3EfX1vbVY7ENu6zfFxDV+vZoxVh8UlQdm90AlYse3JIaUKnB7W
|
||||
Xrihcucv0hZ0N6RAIW5LcFvHK7sVmdR4WbEpODhRGeTtcZJ8yBSZM898jKQRy2vK
|
||||
pYRyaADNsWDlvujVkjMr/a40KrIaPQ3h3LZNUaYYaQKBgQD1x8A5S5SiE1cN1vFr
|
||||
aACkmA2WqEDKKhUsUigJdwW6WB/B9kWlIlz/iV1H9uwBXtSIYG4VqCSTAvh0z4gX
|
||||
Qu2SrdPm5PYnKzpdynpz78OnGdflD1RKWFGHItR6GN6tj/VmulO6mlFvT4jzBQ7j
|
||||
+Hf8m2TcD4U3ksz3xw+YOD+cmA==
|
||||
-----END RSA PRIVATE KEY-----
|
||||
172
Cargo.lock
generated
172
Cargo.lock
generated
@@ -79,12 +79,24 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.73"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
@@ -236,6 +248,21 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.58"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27"
|
||||
dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.126"
|
||||
@@ -385,6 +412,7 @@ dependencies = [
|
||||
"parking_lot",
|
||||
"rand",
|
||||
"regex",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"sha-1",
|
||||
@@ -392,6 +420,7 @@ dependencies = [
|
||||
"sqlparser",
|
||||
"stringprep",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"toml",
|
||||
]
|
||||
|
||||
@@ -497,12 +526,58 @@ version = "0.6.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.16.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"spin",
|
||||
"untrusted",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
"sct",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9"
|
||||
dependencies = [
|
||||
"base64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "sct"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.136"
|
||||
@@ -563,6 +638,12 @@ version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.14.0"
|
||||
@@ -664,6 +745,17 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.23.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"tokio",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.8"
|
||||
@@ -700,6 +792,12 @@ version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
|
||||
|
||||
[[package]]
|
||||
name = "untrusted"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.4"
|
||||
@@ -712,6 +810,80 @@ version = "0.10.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen"
|
||||
version = "0.2.81"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"wasm-bindgen-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-backend"
|
||||
version = "0.2.81"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a"
|
||||
dependencies = [
|
||||
"bumpalo",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro"
|
||||
version = "0.2.81"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"wasm-bindgen-macro-support",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro-support"
|
||||
version = "0.2.81"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-shared"
|
||||
version = "0.2.81"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be"
|
||||
|
||||
[[package]]
|
||||
name = "web-sys"
|
||||
version = "0.3.58"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
|
||||
@@ -29,3 +29,5 @@ hmac = "0.12"
|
||||
sha2 = "0.10"
|
||||
base64 = "0.13"
|
||||
stringprep = "0.1"
|
||||
tokio-rustls = "0.23"
|
||||
rustls-pemfile = "1"
|
||||
|
||||
@@ -32,6 +32,10 @@ ban_time = 60 # Seconds
|
||||
# Reload config automatically if it changes.
|
||||
autoreload = false
|
||||
|
||||
# TLS
|
||||
# tls_certificate = "server.cert"
|
||||
# tls_private_key = "server.key"
|
||||
|
||||
#
|
||||
# User to use for authentication against the server.
|
||||
[user]
|
||||
|
||||
53
src/admin.rs
53
src/admin.rs
@@ -2,7 +2,6 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{info, trace};
|
||||
use std::collections::HashMap;
|
||||
use tokio::net::tcp::OwnedWriteHalf;
|
||||
|
||||
use crate::config::{get_config, reload_config};
|
||||
use crate::errors::Error;
|
||||
@@ -12,12 +11,15 @@ use crate::stats::get_stats;
|
||||
use crate::ClientServerMap;
|
||||
|
||||
/// Handle admin client.
|
||||
pub async fn handle_admin(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
pub async fn handle_admin<T>(
|
||||
stream: &mut T,
|
||||
mut query: BytesMut,
|
||||
pool: ConnectionPool,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let code = query.get_u8() as char;
|
||||
|
||||
if code != 'Q' {
|
||||
@@ -61,7 +63,10 @@ pub async fn handle_admin(
|
||||
}
|
||||
|
||||
/// Column-oriented statistics.
|
||||
async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
async fn show_lists<T>(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
|
||||
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||
@@ -128,7 +133,10 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
||||
}
|
||||
|
||||
/// Show PgCat version.
|
||||
async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
async fn show_version<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut res = BytesMut::new();
|
||||
|
||||
res.put(row_description(&vec![("version", DataType::Text)]));
|
||||
@@ -143,7 +151,10 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Show utilization of connection pools for each shard and replicas.
|
||||
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
async fn show_pools<T>(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
let config = get_config();
|
||||
|
||||
@@ -197,7 +208,10 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
||||
}
|
||||
|
||||
/// Show shards and replicas.
|
||||
async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
async fn show_databases<T>(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let config = get_config();
|
||||
|
||||
// Columns
|
||||
@@ -258,15 +272,18 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R
|
||||
|
||||
/// Ignore any SET commands the client sends.
|
||||
/// This is common initialization done by ORMs.
|
||||
async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
async fn ignore_set<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
custom_protocol_response_ok(stream, "SET").await
|
||||
}
|
||||
|
||||
/// Reload the configuration file without restarting the process.
|
||||
async fn reload(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<(), Error> {
|
||||
async fn reload<T>(stream: &mut T, client_server_map: ClientServerMap) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
info!("Reloading config");
|
||||
|
||||
reload_config(client_server_map).await?;
|
||||
@@ -286,7 +303,10 @@ async fn reload(
|
||||
}
|
||||
|
||||
/// Shows current configuration.
|
||||
async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
async fn show_config<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let config = &get_config();
|
||||
let config: HashMap<String, String> = config.into();
|
||||
|
||||
@@ -329,7 +349,10 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Show shard and replicas statistics.
|
||||
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
async fn show_stats<T>(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("total_xact_count", DataType::Numeric),
|
||||
|
||||
471
src/client.rs
471
src/client.rs
@@ -1,12 +1,9 @@
|
||||
/// Handle clients by pretending to be a PostgreSQL server.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{debug, error, trace};
|
||||
use log::{debug, error, info, trace};
|
||||
use std::collections::HashMap;
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
TcpStream,
|
||||
};
|
||||
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::admin::handle_admin;
|
||||
use crate::config::get_config;
|
||||
@@ -17,20 +14,33 @@ use crate::pool::{get_pool, ClientServerMap};
|
||||
use crate::query_router::{Command, QueryRouter};
|
||||
use crate::server::Server;
|
||||
use crate::stats::{get_reporter, Reporter};
|
||||
use crate::tls::Tls;
|
||||
|
||||
use tokio_rustls::server::TlsStream;
|
||||
|
||||
/// Type of connection received from client.
|
||||
enum ClientConnectionType {
|
||||
Startup,
|
||||
Tls,
|
||||
CancelQuery,
|
||||
}
|
||||
|
||||
/// The client state. One of these is created per client.
|
||||
pub struct Client {
|
||||
pub struct Client<S, T> {
|
||||
/// The reads are buffered (8K by default).
|
||||
read: BufReader<OwnedReadHalf>,
|
||||
read: BufReader<S>,
|
||||
|
||||
/// We buffer the writes ourselves because we know the protocol
|
||||
/// better than a stock buffer.
|
||||
write: OwnedWriteHalf,
|
||||
write: T,
|
||||
|
||||
/// Internal buffer, where we place messages until we have to flush
|
||||
/// them to the backend.
|
||||
buffer: BytesMut,
|
||||
|
||||
/// Address
|
||||
addr: std::net::SocketAddr,
|
||||
|
||||
/// The client was started with the sole reason to cancel another running query.
|
||||
cancel_mode: bool,
|
||||
|
||||
@@ -63,161 +73,307 @@ pub struct Client {
|
||||
last_server_id: Option<i32>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Perform client startup sequence.
|
||||
/// See docs: <https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.3>
|
||||
/// Client entrypoint.
|
||||
pub async fn client_entrypoint(
|
||||
mut stream: TcpStream,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<(), Error> {
|
||||
// Figure out if the client wants TLS or not.
|
||||
let addr = stream.peer_addr().unwrap();
|
||||
|
||||
match get_startup::<TcpStream>(&mut stream).await {
|
||||
// Client requested a TLS connection.
|
||||
Ok((ClientConnectionType::Tls, _)) => {
|
||||
let config = get_config();
|
||||
|
||||
// TLS settings are configured, will setup TLS now.
|
||||
if config.general.tls_certificate != None {
|
||||
debug!("Accepting TLS request");
|
||||
|
||||
let mut yes = BytesMut::new();
|
||||
yes.put_u8(b'S');
|
||||
write_all(&mut stream, yes).await?;
|
||||
|
||||
// Negotiate TLS.
|
||||
match startup_tls(stream, client_server_map).await {
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} connected (TLS)", addr);
|
||||
|
||||
client.handle().await
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
// TLS is not configured, we cannot offer it.
|
||||
else {
|
||||
// Rejecting client request for TLS.
|
||||
let mut no = BytesMut::new();
|
||||
no.put_u8(b'N');
|
||||
write_all(&mut stream, no).await?;
|
||||
|
||||
// Attempting regular startup. Client can disconnect now
|
||||
// if they choose.
|
||||
match get_startup::<TcpStream>(&mut stream).await {
|
||||
// Client accepted unencrypted connection.
|
||||
Ok((ClientConnectionType::Startup, bytes)) => {
|
||||
let (read, write) = split(stream);
|
||||
|
||||
// Continue with regular startup.
|
||||
match Client::startup(read, write, addr, bytes, client_server_map).await {
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} connected (plain)", addr);
|
||||
|
||||
client.handle().await
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Client probably disconnected rejecting our plain text connection.
|
||||
_ => Err(Error::ProtocolSyncError),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Client wants to use plain connection without encryption.
|
||||
Ok((ClientConnectionType::Startup, bytes)) => {
|
||||
let (read, write) = split(stream);
|
||||
|
||||
// Continue with regular startup.
|
||||
match Client::startup(read, write, addr, bytes, client_server_map).await {
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} connected (plain)", addr);
|
||||
|
||||
client.handle().await
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Client wants to cancel a query.
|
||||
Ok((ClientConnectionType::CancelQuery, bytes)) => {
|
||||
let (read, write) = split(stream);
|
||||
|
||||
// Continue with cancel query request.
|
||||
match Client::cancel(read, write, addr, bytes, client_server_map).await {
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} issued a cancel query request", addr);
|
||||
|
||||
client.handle().await
|
||||
}
|
||||
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Something failed, probably the socket.
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the first message the client sends.
|
||||
async fn get_startup<S>(stream: &mut S) -> Result<(ClientConnectionType, BytesMut), Error>
|
||||
where
|
||||
S: tokio::io::AsyncRead + std::marker::Unpin + tokio::io::AsyncWrite,
|
||||
{
|
||||
// Get startup message length.
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::ClientBadStartup),
|
||||
};
|
||||
|
||||
// Get the rest of the message.
|
||||
let mut startup = vec![0u8; len as usize - 4];
|
||||
match stream.read_exact(&mut startup).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::ClientBadStartup),
|
||||
};
|
||||
|
||||
let mut bytes = BytesMut::from(&startup[..]);
|
||||
let code = bytes.get_i32();
|
||||
|
||||
match code {
|
||||
// Client is requesting SSL (TLS).
|
||||
SSL_REQUEST_CODE => Ok((ClientConnectionType::Tls, bytes)),
|
||||
|
||||
// Client wants to use plain text, requesting regular startup.
|
||||
PROTOCOL_VERSION_NUMBER => Ok((ClientConnectionType::Startup, bytes)),
|
||||
|
||||
// Client is requesting to cancel a running query (plain text connection).
|
||||
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
||||
|
||||
// Something else, probably something is wrong and it's not our fault,
|
||||
// e.g. badly implemented Postgres client.
|
||||
_ => Err(Error::ProtocolSyncError),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle TLS connection negotation.
|
||||
pub async fn startup_tls(
|
||||
stream: TcpStream,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<Client<ReadHalf<TlsStream<TcpStream>>, WriteHalf<TlsStream<TcpStream>>>, Error> {
|
||||
// Negotiate TLS.
|
||||
let tls = Tls::new()?;
|
||||
let addr = stream.peer_addr().unwrap();
|
||||
|
||||
let mut stream = match tls.acceptor.accept(stream).await {
|
||||
Ok(stream) => stream,
|
||||
|
||||
// TLS negotitation failed.
|
||||
Err(err) => {
|
||||
error!("TLS negotiation failed: {:?}", err);
|
||||
return Err(Error::TlsError);
|
||||
}
|
||||
};
|
||||
|
||||
// TLS negotitation successful.
|
||||
// Continue with regular startup using encrypted connection.
|
||||
match get_startup::<TlsStream<TcpStream>>(&mut stream).await {
|
||||
// Got good startup message, proceeding like normal except we
|
||||
// are encrypted now.
|
||||
Ok((ClientConnectionType::Startup, bytes)) => {
|
||||
let (read, write) = split(stream);
|
||||
|
||||
Client::startup(read, write, addr, bytes, client_server_map).await
|
||||
}
|
||||
|
||||
// Bad Postgres client.
|
||||
_ => Err(Error::ProtocolSyncError),
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> Client<S, T>
|
||||
where
|
||||
S: tokio::io::AsyncRead + std::marker::Unpin,
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
/// Handle Postgres client startup after TLS negotiation is complete
|
||||
/// or over plain text.
|
||||
pub async fn startup(
|
||||
mut stream: TcpStream,
|
||||
mut read: S,
|
||||
mut write: T,
|
||||
addr: std::net::SocketAddr,
|
||||
bytes: BytesMut, // The rest of the startup message.
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<Client, Error> {
|
||||
) -> Result<Client<S, T>, Error> {
|
||||
let config = get_config();
|
||||
let transaction_mode = config.general.pool_mode == "transaction";
|
||||
let stats = get_reporter();
|
||||
|
||||
loop {
|
||||
trace!("Waiting for StartupMessage");
|
||||
trace!("Got StartupMessage");
|
||||
let parameters = parse_startup(bytes.clone())?;
|
||||
|
||||
// Could be StartupMessage, SSLRequest or CancelRequest.
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::ClientBadStartup),
|
||||
};
|
||||
// Generate random backend ID and secret key
|
||||
let process_id: i32 = rand::random();
|
||||
let secret_key: i32 = rand::random();
|
||||
|
||||
let mut startup = vec![0u8; len as usize - 4];
|
||||
// Perform MD5 authentication.
|
||||
// TODO: Add SASL support.
|
||||
let salt = md5_challenge(&mut write).await?;
|
||||
|
||||
match stream.read_exact(&mut startup).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::ClientBadStartup),
|
||||
};
|
||||
let code = match read.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let mut bytes = BytesMut::from(&startup[..]);
|
||||
let code = bytes.get_i32();
|
||||
|
||||
match code {
|
||||
// Client wants SSL. We don't support it at the moment.
|
||||
SSL_REQUEST_CODE => {
|
||||
trace!("Rejecting SSLRequest");
|
||||
|
||||
let mut no = BytesMut::with_capacity(1);
|
||||
no.put_u8(b'N');
|
||||
|
||||
write_all(&mut stream, no).await?;
|
||||
}
|
||||
|
||||
// Regular startup message.
|
||||
PROTOCOL_VERSION_NUMBER => {
|
||||
trace!("Got StartupMessage");
|
||||
let parameters = parse_startup(bytes.clone())?;
|
||||
|
||||
// Generate random backend ID and secret key
|
||||
let process_id: i32 = rand::random();
|
||||
let secret_key: i32 = rand::random();
|
||||
|
||||
// Perform MD5 authentication.
|
||||
// TODO: Add SASL support.
|
||||
let salt = md5_challenge(&mut stream).await?;
|
||||
|
||||
let code = match stream.read_u8().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
// PasswordMessage
|
||||
if code as char != 'p' {
|
||||
debug!("Expected p, got {}", code as char);
|
||||
return Err(Error::ProtocolSyncError);
|
||||
}
|
||||
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match stream.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
// Compare server and client hashes.
|
||||
let password_hash =
|
||||
md5_hash_password(&config.user.name, &config.user.password, &salt);
|
||||
|
||||
if password_hash != password_response {
|
||||
debug!("Password authentication failed");
|
||||
wrong_password(&mut stream, &config.user.name).await?;
|
||||
return Err(Error::ClientError);
|
||||
}
|
||||
|
||||
debug!("Password authentication successful");
|
||||
|
||||
auth_ok(&mut stream).await?;
|
||||
write_all(&mut stream, get_pool().server_info()).await?;
|
||||
backend_key_data(&mut stream, process_id, secret_key).await?;
|
||||
ready_for_query(&mut stream).await?;
|
||||
|
||||
trace!("Startup OK");
|
||||
|
||||
let database = parameters
|
||||
.get("database")
|
||||
.unwrap_or(parameters.get("user").unwrap());
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == &database)
|
||||
.count()
|
||||
== 1;
|
||||
|
||||
// Split the read and write streams
|
||||
// so we can control buffering.
|
||||
let (read, write) = stream.into_split();
|
||||
|
||||
return Ok(Client {
|
||||
read: BufReader::new(read),
|
||||
write: write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
cancel_mode: false,
|
||||
transaction_mode: transaction_mode,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
client_server_map: client_server_map,
|
||||
parameters: parameters,
|
||||
stats: stats,
|
||||
admin: admin,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Query cancel request.
|
||||
CANCEL_REQUEST_CODE => {
|
||||
let (read, write) = stream.into_split();
|
||||
|
||||
let process_id = bytes.get_i32();
|
||||
let secret_key = bytes.get_i32();
|
||||
|
||||
return Ok(Client {
|
||||
read: BufReader::new(read),
|
||||
write: write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
cancel_mode: true,
|
||||
transaction_mode: transaction_mode,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
client_server_map: client_server_map,
|
||||
parameters: HashMap::new(),
|
||||
stats: stats,
|
||||
admin: false,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
_ => {
|
||||
return Err(Error::ProtocolSyncError);
|
||||
}
|
||||
};
|
||||
// PasswordMessage
|
||||
if code as char != 'p' {
|
||||
debug!("Expected p, got {}", code as char);
|
||||
return Err(Error::ProtocolSyncError);
|
||||
}
|
||||
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||
|
||||
match read.read_exact(&mut password_response).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
// Compare server and client hashes.
|
||||
let password_hash = md5_hash_password(&config.user.name, &config.user.password, &salt);
|
||||
|
||||
if password_hash != password_response {
|
||||
debug!("Password authentication failed");
|
||||
wrong_password(&mut write, &config.user.name).await?;
|
||||
return Err(Error::ClientError);
|
||||
}
|
||||
|
||||
debug!("Password authentication successful");
|
||||
|
||||
auth_ok(&mut write).await?;
|
||||
write_all(&mut write, get_pool().server_info()).await?;
|
||||
backend_key_data(&mut write, process_id, secret_key).await?;
|
||||
ready_for_query(&mut write).await?;
|
||||
|
||||
trace!("Startup OK");
|
||||
|
||||
let database = parameters
|
||||
.get("database")
|
||||
.unwrap_or(parameters.get("user").unwrap());
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == &database)
|
||||
.count()
|
||||
== 1;
|
||||
|
||||
// Split the read and write streams
|
||||
// so we can control buffering.
|
||||
|
||||
return Ok(Client {
|
||||
read: BufReader::new(read),
|
||||
write: write,
|
||||
addr,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
cancel_mode: false,
|
||||
transaction_mode: transaction_mode,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
client_server_map: client_server_map,
|
||||
parameters: parameters,
|
||||
stats: stats,
|
||||
admin: admin,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle cancel request.
|
||||
pub async fn cancel(
|
||||
read: S,
|
||||
write: T,
|
||||
addr: std::net::SocketAddr,
|
||||
mut bytes: BytesMut, // The rest of the startup message.
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<Client<S, T>, Error> {
|
||||
let process_id = bytes.get_i32();
|
||||
let secret_key = bytes.get_i32();
|
||||
|
||||
let config = get_config();
|
||||
let transaction_mode = config.general.pool_mode == "transaction";
|
||||
let stats = get_reporter();
|
||||
|
||||
return Ok(Client {
|
||||
read: BufReader::new(read),
|
||||
write: write,
|
||||
addr,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
cancel_mode: true,
|
||||
transaction_mode: transaction_mode,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
client_server_map: client_server_map,
|
||||
parameters: HashMap::new(),
|
||||
stats: stats,
|
||||
admin: false,
|
||||
last_address_id: None,
|
||||
last_server_id: None,
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a connected and authenticated client.
|
||||
@@ -415,7 +571,7 @@ impl Client {
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
self.write.peer_addr().unwrap(),
|
||||
self.addr,
|
||||
server.address()
|
||||
);
|
||||
|
||||
@@ -650,8 +806,11 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Client {
|
||||
impl<S, T> Drop for Client<S, T> {
|
||||
fn drop(&mut self) {
|
||||
let mut guard = self.client_server_map.lock();
|
||||
guard.remove(&(self.process_id, self.secret_key));
|
||||
|
||||
// Update statistics.
|
||||
if let Some(address_id) = self.last_address_id {
|
||||
self.stats.client_disconnecting(self.process_id, address_id);
|
||||
@@ -660,5 +819,7 @@ impl Drop for Client {
|
||||
self.stats.server_idle(process_id, address_id);
|
||||
}
|
||||
}
|
||||
|
||||
// self.release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,12 +4,14 @@ use log::{error, info};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde_derive::Deserialize;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use toml;
|
||||
|
||||
use crate::errors::Error;
|
||||
use crate::tls::{load_certs, load_keys};
|
||||
use crate::{ClientServerMap, ConnectionPool};
|
||||
|
||||
/// Globally available configuration.
|
||||
@@ -111,6 +113,8 @@ pub struct General {
|
||||
pub healthcheck_timeout: u64,
|
||||
pub ban_time: i64,
|
||||
pub autoreload: bool,
|
||||
pub tls_certificate: Option<String>,
|
||||
pub tls_private_key: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for General {
|
||||
@@ -124,6 +128,8 @@ impl Default for General {
|
||||
healthcheck_timeout: 1000,
|
||||
ban_time: 60,
|
||||
autoreload: false,
|
||||
tls_certificate: None,
|
||||
tls_private_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,6 +255,25 @@ impl Config {
|
||||
info!("Primary reads: {}", self.query_router.primary_reads_enabled);
|
||||
info!("Query router: {}", self.query_router.query_parser_enabled);
|
||||
info!("Number of shards: {}", self.shards.len());
|
||||
|
||||
match self.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
info!("TLS certificate: {}", tls_certificate);
|
||||
|
||||
match self.general.tls_private_key.clone() {
|
||||
Some(tls_private_key) => {
|
||||
info!("TLS private key: {}", tls_private_key);
|
||||
info!("TLS support is enabled");
|
||||
}
|
||||
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
info!("TLS support is disabled");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -368,6 +393,37 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
// Validate TLS!
|
||||
match config.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
match load_certs(&Path::new(&tls_certificate)) {
|
||||
Ok(_) => {
|
||||
// Cert is okay, but what about the private key?
|
||||
match config.general.tls_private_key.clone() {
|
||||
Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("tls_private_key is incorrectly configured: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
},
|
||||
|
||||
None => {
|
||||
error!("tls_certificate is set, but the tls_private_key is not");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("tls_certificate is incorrectly configured: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
|
||||
config.path = path.to_string();
|
||||
|
||||
// Update the configuration globally.
|
||||
|
||||
@@ -10,4 +10,5 @@ pub enum Error {
|
||||
BadConfig,
|
||||
AllServersDown,
|
||||
ClientError,
|
||||
TlsError,
|
||||
}
|
||||
|
||||
31
src/main.rs
31
src/main.rs
@@ -28,10 +28,12 @@ extern crate log;
|
||||
extern crate md5;
|
||||
extern crate num_cpus;
|
||||
extern crate once_cell;
|
||||
extern crate rustls_pemfile;
|
||||
extern crate serde;
|
||||
extern crate serde_derive;
|
||||
extern crate sqlparser;
|
||||
extern crate tokio;
|
||||
extern crate tokio_rustls;
|
||||
extern crate toml;
|
||||
|
||||
use log::{debug, error, info};
|
||||
@@ -58,6 +60,7 @@ mod scram;
|
||||
mod server;
|
||||
mod sharding;
|
||||
mod stats;
|
||||
mod tls;
|
||||
|
||||
use config::{get_config, reload_config};
|
||||
use pool::{ClientServerMap, ConnectionPool};
|
||||
@@ -150,30 +153,20 @@ async fn main() {
|
||||
// Handle client.
|
||||
tokio::task::spawn(async move {
|
||||
let start = chrono::offset::Utc::now().naive_utc();
|
||||
match client::Client::startup(socket, client_server_map).await {
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} connected", addr);
|
||||
|
||||
match client.handle().await {
|
||||
Ok(()) => {
|
||||
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||
match client::client_entrypoint(socket, client_server_map).await {
|
||||
Ok(_) => {
|
||||
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||
|
||||
info!(
|
||||
"Client {:?} disconnected, session duration: {}",
|
||||
addr,
|
||||
format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("Client disconnected with error: {:?}", err);
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Client {:?} disconnected, session duration: {}",
|
||||
addr,
|
||||
format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
debug!("Client failed to login: {:?}", err);
|
||||
debug!("Client disconnected with error {:?}", err);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
/// and handle TcpStream (TCP socket).
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use md5::{Digest, Md5};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
TcpStream,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::errors::Error;
|
||||
use std::collections::HashMap;
|
||||
@@ -30,7 +27,10 @@ impl From<&DataType> for i32 {
|
||||
}
|
||||
|
||||
/// Tell the client that authentication handshake completed successfully.
|
||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
pub async fn auth_ok<S>(stream: &mut S) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut auth_ok = BytesMut::with_capacity(9);
|
||||
|
||||
auth_ok.put_u8(b'R');
|
||||
@@ -41,7 +41,10 @@ pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Generate md5 password challenge.
|
||||
pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> {
|
||||
pub async fn md5_challenge<S>(stream: &mut S) -> Result<[u8; 4], Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
// let mut rng = rand::thread_rng();
|
||||
let salt: [u8; 4] = [
|
||||
rand::random(),
|
||||
@@ -62,11 +65,14 @@ pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> {
|
||||
|
||||
/// Give the client the process_id and secret we generated
|
||||
/// used in query cancellation.
|
||||
pub async fn backend_key_data(
|
||||
stream: &mut TcpStream,
|
||||
pub async fn backend_key_data<S>(
|
||||
stream: &mut S,
|
||||
backend_id: i32,
|
||||
secret_key: i32,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut key_data = BytesMut::from(&b"K"[..]);
|
||||
key_data.put_i32(12);
|
||||
key_data.put_i32(backend_id);
|
||||
@@ -87,7 +93,10 @@ pub fn simple_query(query: &str) -> BytesMut {
|
||||
}
|
||||
|
||||
/// Tell the client we're ready for another query.
|
||||
pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
pub async fn ready_for_query<S>(stream: &mut S) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut bytes = BytesMut::with_capacity(5);
|
||||
|
||||
bytes.put_u8(b'Z');
|
||||
@@ -205,12 +214,15 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
|
||||
|
||||
/// Send password challenge response to the server.
|
||||
/// This is the MD5 challenge.
|
||||
pub async fn md5_password(
|
||||
stream: &mut TcpStream,
|
||||
pub async fn md5_password<S>(
|
||||
stream: &mut S,
|
||||
user: &str,
|
||||
password: &str,
|
||||
salt: &[u8],
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let password = md5_hash_password(user, password, salt);
|
||||
|
||||
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
|
||||
@@ -225,10 +237,10 @@ pub async fn md5_password(
|
||||
/// Implements a response to our custom `SET SHARDING KEY`
|
||||
/// and `SET SERVER ROLE` commands.
|
||||
/// This tells the client we're ready for the next query.
|
||||
pub async fn custom_protocol_response_ok(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
message: &str,
|
||||
) -> Result<(), Error> {
|
||||
pub async fn custom_protocol_response_ok<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut res = BytesMut::with_capacity(25);
|
||||
|
||||
let set_complete = BytesMut::from(&format!("{}\0", message)[..]);
|
||||
@@ -250,7 +262,10 @@ pub async fn custom_protocol_response_ok(
|
||||
/// Send a custom error message to the client.
|
||||
/// Tell the client we are ready for the next query and no rollback is necessary.
|
||||
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
|
||||
pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> {
|
||||
pub async fn error_response<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut error = BytesMut::new();
|
||||
|
||||
// Error level
|
||||
@@ -291,7 +306,10 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul
|
||||
Ok(write_all_half(stream, res).await?)
|
||||
}
|
||||
|
||||
pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Error> {
|
||||
pub async fn wrong_password<S>(stream: &mut S, user: &str) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let mut error = BytesMut::new();
|
||||
|
||||
// Error level
|
||||
@@ -325,11 +343,10 @@ pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Er
|
||||
}
|
||||
|
||||
/// Respond to a SHOW SHARD command.
|
||||
pub async fn show_response(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
name: &str,
|
||||
value: &str,
|
||||
) -> Result<(), Error> {
|
||||
pub async fn show_response<S>(stream: &mut S, name: &str, value: &str) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
// A SELECT response consists of:
|
||||
// 1. RowDescription
|
||||
// 2. One or more DataRow
|
||||
@@ -430,7 +447,10 @@ pub fn command_complete(command: &str) -> BytesMut {
|
||||
}
|
||||
|
||||
/// Write all data in the buffer to the TcpStream.
|
||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
match stream.write_all(&buf).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
@@ -438,7 +458,10 @@ pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Erro
|
||||
}
|
||||
|
||||
/// Write all the data in the buffer to the TcpStream, write owned half (see mpsc).
|
||||
pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Result<(), Error> {
|
||||
pub async fn write_all_half<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
|
||||
where
|
||||
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
match stream.write_all(&buf).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
@@ -446,7 +469,10 @@ pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Resul
|
||||
}
|
||||
|
||||
/// Read a complete message from the socket.
|
||||
pub async fn read_message(stream: &mut BufReader<OwnedReadHalf>) -> Result<BytesMut, Error> {
|
||||
pub async fn read_message<S>(stream: &mut S) -> Result<BytesMut, Error>
|
||||
where
|
||||
S: tokio::io::AsyncRead + std::marker::Unpin,
|
||||
{
|
||||
let code = match stream.read_u8().await {
|
||||
Ok(code) => code,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
|
||||
57
src/tls.rs
Normal file
57
src/tls.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
// Stream wrapper.
|
||||
|
||||
use rustls_pemfile::{certs, rsa_private_keys};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio_rustls::rustls::{self, Certificate, PrivateKey};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
|
||||
use crate::config::get_config;
|
||||
use crate::errors::Error;
|
||||
|
||||
// TLS
|
||||
pub fn load_certs(path: &Path) -> std::io::Result<Vec<Certificate>> {
|
||||
certs(&mut std::io::BufReader::new(std::fs::File::open(path)?))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))
|
||||
.map(|mut certs| certs.drain(..).map(Certificate).collect())
|
||||
}
|
||||
|
||||
pub fn load_keys(path: &Path) -> std::io::Result<Vec<PrivateKey>> {
|
||||
rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key"))
|
||||
.map(|mut keys| keys.drain(..).map(PrivateKey).collect())
|
||||
}
|
||||
|
||||
pub struct Tls {
|
||||
pub acceptor: TlsAcceptor,
|
||||
}
|
||||
|
||||
impl Tls {
|
||||
pub fn new() -> Result<Self, Error> {
|
||||
let config = get_config();
|
||||
|
||||
let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) {
|
||||
Ok(certs) => certs,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) {
|
||||
Ok(keys) => keys,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
let config = match rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, keys.remove(0))
|
||||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
Ok(Tls {
|
||||
acceptor: TlsAcceptor::from(Arc::new(config)),
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user