From b974aacd719b076036ae8b1a60f682db2d578e63 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 27 Jun 2022 09:46:33 -0700 Subject: [PATCH 1/9] check --- Cargo.lock | 172 ++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + pgcat.toml | 4 ++ src/client.rs | 12 ++-- src/config.rs | 4 ++ src/errors.rs | 1 + src/main.rs | 3 + src/messages.rs | 42 +++++++----- src/stream.rs | 160 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 378 insertions(+), 22 deletions(-) create mode 100644 src/stream.rs diff --git a/Cargo.lock b/Cargo.lock index b8b4bfd..cfaa0fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,12 +79,24 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" + [[package]] name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + [[package]] name = "cfg-if" version = "1.0.0" @@ -236,6 +248,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "js-sys" +version = "0.3.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.126" @@ -385,6 +412,7 @@ dependencies = [ "parking_lot", "rand", "regex", + "rustls-pemfile", "serde", "serde_derive", "sha-1", @@ -392,6 +420,7 @@ dependencies = [ "sqlparser", "stringprep", "tokio", + "tokio-rustls", "toml", ] @@ -497,12 +526,58 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" +dependencies = [ + "base64", +] + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "serde" version = "1.0.136" @@ -563,6 +638,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "sqlparser" version = "0.14.0" @@ -664,6 +745,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "toml" version = "0.5.8" @@ -700,6 +792,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "version_check" version = "0.9.4" @@ -712,6 +810,80 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be" + +[[package]] +name = "web-sys" +version = "0.3.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index fa63c0e..b193e77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,5 @@ hmac = "0.12" sha2 = "0.10" base64 = "0.13" stringprep = "0.1" +tokio-rustls = "*" +rustls-pemfile = "*" diff --git a/pgcat.toml b/pgcat.toml index 70b2fae..b2a6b76 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -32,6 +32,10 @@ ban_time = 60 # Seconds # Reload config automatically if it changes. autoreload = false +# TLS +tls_certificate = "server.cert" +tls_private_key = "server.key" + # # User to use for authentication against the server. [user] diff --git a/src/client.rs b/src/client.rs index ea1bf01..c02fcf8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,13 +19,13 @@ use crate::server::Server; use crate::stats::{get_reporter, Reporter}; /// The client state. One of these is created per client. -pub struct Client { +pub struct Client { /// The reads are buffered (8K by default). - read: BufReader, + read: BufReader, /// We buffer the writes ourselves because we know the protocol /// better than a stock buffer. - write: OwnedWriteHalf, + write: S, /// Internal buffer, where we place messages until we have to flush /// them to the backend. @@ -63,13 +63,13 @@ pub struct Client { last_server_id: Option, } -impl Client { +impl Client { /// Perform client startup sequence. /// See docs: pub async fn startup( mut stream: TcpStream, client_server_map: ClientServerMap, - ) -> Result { + ) -> Result, Error> { let config = get_config(); let transaction_mode = config.general.pool_mode == "transaction"; let stats = get_reporter(); @@ -650,7 +650,7 @@ impl Client { } } -impl Drop for Client { +impl Drop for Client { fn drop(&mut self) { // Update statistics. if let Some(address_id) = self.last_address_id { diff --git a/src/config.rs b/src/config.rs index f6fa129..1700541 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,6 +111,8 @@ pub struct General { pub healthcheck_timeout: u64, pub ban_time: i64, pub autoreload: bool, + pub tls_certificate: Option, + pub tls_private_key: Option, } impl Default for General { @@ -124,6 +126,8 @@ impl Default for General { healthcheck_timeout: 1000, ban_time: 60, autoreload: false, + tls_certificate: None, + tls_private_key: None, } } } diff --git a/src/errors.rs b/src/errors.rs index b07d508..cc8f65d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -10,4 +10,5 @@ pub enum Error { BadConfig, AllServersDown, ClientError, + TlsError, } diff --git a/src/main.rs b/src/main.rs index 7b78e5b..05bfb23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,6 +33,8 @@ extern crate serde_derive; extern crate sqlparser; extern crate tokio; extern crate toml; +extern crate tokio_rustls; +extern crate rustls_pemfile; use log::{debug, error, info}; use parking_lot::Mutex; @@ -58,6 +60,7 @@ mod scram; mod server; mod sharding; mod stats; +mod stream; use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; diff --git a/src/messages.rs b/src/messages.rs index 993545b..960a3b6 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -30,7 +30,8 @@ impl From<&DataType> for i32 { } /// Tell the client that authentication handshake completed successfully. -pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> { +pub async fn auth_ok(stream: &mut S) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut auth_ok = BytesMut::with_capacity(9); auth_ok.put_u8(b'R'); @@ -41,7 +42,8 @@ pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> { } /// Generate md5 password challenge. -pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> { +pub async fn md5_challenge(stream: &mut S) -> Result<[u8; 4], Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { // let mut rng = rand::thread_rng(); let salt: [u8; 4] = [ rand::random(), @@ -62,11 +64,12 @@ pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> { /// Give the client the process_id and secret we generated /// used in query cancellation. -pub async fn backend_key_data( - stream: &mut TcpStream, +pub async fn backend_key_data( + stream: &mut S, backend_id: i32, secret_key: i32, -) -> Result<(), Error> { +) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut key_data = BytesMut::from(&b"K"[..]); key_data.put_i32(12); key_data.put_i32(backend_id); @@ -87,7 +90,8 @@ pub fn simple_query(query: &str) -> BytesMut { } /// Tell the client we're ready for another query. -pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> { +pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut bytes = BytesMut::with_capacity(5); bytes.put_u8(b'Z'); @@ -205,12 +209,13 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec { /// Send password challenge response to the server. /// This is the MD5 challenge. -pub async fn md5_password( - stream: &mut TcpStream, +pub async fn md5_password( + stream: &mut S, user: &str, password: &str, salt: &[u8], -) -> Result<(), Error> { +) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let password = md5_hash_password(user, password, salt); let mut message = BytesMut::with_capacity(password.len() as usize + 5); @@ -225,10 +230,11 @@ pub async fn md5_password( /// Implements a response to our custom `SET SHARDING KEY` /// and `SET SERVER ROLE` commands. /// This tells the client we're ready for the next query. -pub async fn custom_protocol_response_ok( - stream: &mut OwnedWriteHalf, +pub async fn custom_protocol_response_ok( + stream: &mut S, message: &str, -) -> Result<(), Error> { +) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut res = BytesMut::with_capacity(25); let set_complete = BytesMut::from(&format!("{}\0", message)[..]); @@ -250,7 +256,8 @@ pub async fn custom_protocol_response_ok( /// Send a custom error message to the client. /// Tell the client we are ready for the next query and no rollback is necessary. /// Docs on error codes: . -pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> { +pub async fn error_response(stream: &mut S, message: &str) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut error = BytesMut::new(); // Error level @@ -291,7 +298,8 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul Ok(write_all_half(stream, res).await?) } -pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Error> { +pub async fn wrong_password(stream: &mut S, user: &str) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { let mut error = BytesMut::new(); // Error level @@ -430,7 +438,8 @@ pub fn command_complete(command: &str) -> BytesMut { } /// Write all data in the buffer to the TcpStream. -pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> { +pub async fn write_all(stream: &mut S, buf: BytesMut) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), @@ -438,7 +447,8 @@ pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Erro } /// Write all the data in the buffer to the TcpStream, write owned half (see mpsc). -pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Result<(), Error> { +pub async fn write_all_half(stream: &mut S, buf: BytesMut) -> Result<(), Error> +where S: tokio::io::AsyncWrite + std::marker::Unpin { match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..41d3cd1 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,160 @@ +// Stream wrapper. + +use bytes::{Buf, BufMut, BytesMut}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, split, ReadHalf, WriteHalf}; +use tokio::net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, +}; +use tokio_rustls::server::TlsStream; +use rustls_pemfile::{certs, rsa_private_keys}; +use tokio_rustls::rustls::{self, Certificate, PrivateKey}; +use tokio_rustls::TlsAcceptor; +use std::sync::Arc; +use std::path::Path; + +use crate::config::get_config; +use crate::errors::Error; + +// TLS +fn load_certs(path: &std::path::Path) -> std::io::Result> { + certs(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert")) + .map(|mut certs| certs.drain(..).map(Certificate).collect()) +} + +fn load_keys(path: &std::path::Path) -> std::io::Result> { + rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key")) + .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) +} + +struct Tls { + acceptor: TlsAcceptor, +} + +impl Tls { + pub fn new() -> Result { + let config = get_config(); + + let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { + Ok(certs) => certs, + Err(_) => return Err(Error::TlsError), + }; + + let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { + Ok(keys) => keys, + Err(_) => return Err(Error::TlsError), + }; + + let config = match rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, keys.remove(0)) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) { + Ok(c) => c, + Err(_) => return Err(Error::TlsError) + }; + + Ok(Tls { + acceptor: TlsAcceptor::from(Arc::new(config)), + }) + } +} + +struct Stream { + read: Option>, + write: Option, + tls_read: Option>>>, + tls_write: Option>>, +} + + +impl Stream { + pub async fn new(stream: TcpStream, tls: Option) -> Result { + + let config = get_config(); + + match tls { + None => { + let (read, write) = stream.into_split(); + let read = BufReader::new(read); + Ok( + Self { + read: Some(read), + write: Some(write), + tls_read: None, + tls_write: None, + } + ) + } + + Some(tls) => { + let mut tls_stream = match tls.acceptor.accept(stream).await { + Ok(stream) => stream, + Err(_) => return Err(Error::TlsError), + }; + + let (read, write) = split(tls_stream); + + Ok(Self{ + read: None, + write: None, + tls_read: Some(BufReader::new(read)), + tls_write: Some(write), + }) + } + } + } + + async fn read(stream: &mut S) -> Result + where S: tokio::io::AsyncRead + std::marker::Unpin { + + let code = match stream.read_u8().await { + Ok(code) => code, + Err(_) => return Err(Error::SocketError), + }; + + let len = match stream.read_i32().await { + Ok(len) => len, + Err(_) => return Err(Error::SocketError), + }; + + let mut buf = vec![0u8; len as usize - 4]; + + match stream.read_exact(&mut buf).await { + Ok(_) => (), + Err(_) => return Err(Error::SocketError), + }; + + let mut bytes = BytesMut::with_capacity(len as usize + 1); + + bytes.put_u8(code); + bytes.put_i32(len); + bytes.put_slice(&buf); + + Ok(bytes) + } + + async fn write(stream: &mut S, buf: &BytesMut) -> Result<(), Error> + where S: tokio::io::AsyncWrite + std::marker::Unpin { + match stream.write_all(buf).await { + Ok(_) => Ok(()), + Err(_) => return Err(Error::SocketError), + } + } + + pub async fn read_message(&mut self) -> Result { + match &self.read { + Some(read) => Self::read(self.read.as_mut().unwrap()).await, + None => Self::read(self.tls_read.as_mut().unwrap()).await, + } + } + + pub async fn write_all(&mut self, buf: &BytesMut) -> Result<(), Error> { + match &self.write { + Some(write) => Self::write(self.write.as_mut().unwrap(), buf).await, + None => Self::write(self.tls_write.as_mut().unwrap(), buf).await, + } + } +} \ No newline at end of file From eb5892087000028704a332c41dd08ebb05ae7f70 Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 15:52:01 -0700 Subject: [PATCH 2/9] at least it compiles --- src/admin.rs | 54 ++++-- src/client.rs | 498 +++++++++++++++++++++++++++++++++--------------- src/main.rs | 65 ++++--- src/messages.rs | 59 ++++-- src/stream.rs | 185 +++++++----------- 5 files changed, 538 insertions(+), 323 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index b7a5b6f..622c14a 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -2,7 +2,7 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; -use tokio::net::tcp::OwnedWriteHalf; +// use tokio::net::tcp::T; use crate::config::{get_config, reload_config}; use crate::errors::Error; @@ -12,12 +12,15 @@ use crate::stats::get_stats; use crate::ClientServerMap; /// Handle admin client. -pub async fn handle_admin( - stream: &mut OwnedWriteHalf, +pub async fn handle_admin( + stream: &mut T, mut query: BytesMut, pool: ConnectionPool, client_server_map: ClientServerMap, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let code = query.get_u8() as char; if code != 'Q' { @@ -61,7 +64,10 @@ pub async fn handle_admin( } /// Column-oriented statistics. -async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_lists(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let stats = get_stats(); let columns = vec![("list", DataType::Text), ("items", DataType::Int4)]; @@ -128,7 +134,10 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul } /// Show PgCat version. -async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn show_version(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut res = BytesMut::new(); res.put(row_description(&vec![("version", DataType::Text)])); @@ -143,7 +152,10 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// Show utilization of connection pools for each shard and replicas. -async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_pools(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let stats = get_stats(); let config = get_config(); @@ -197,7 +209,10 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul } /// Show shards and replicas. -async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_databases(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let config = get_config(); // Columns @@ -258,15 +273,18 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R /// Ignore any SET commands the client sends. /// This is common initialization done by ORMs. -async fn ignore_set(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn ignore_set(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ custom_protocol_response_ok(stream, "SET").await } /// Reload the configuration file without restarting the process. -async fn reload( - stream: &mut OwnedWriteHalf, - client_server_map: ClientServerMap, -) -> Result<(), Error> { +async fn reload(stream: &mut T, client_server_map: ClientServerMap) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ info!("Reloading config"); reload_config(client_server_map).await?; @@ -286,7 +304,10 @@ async fn reload( } /// Shows current configuration. -async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +async fn show_config(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let config = &get_config(); let config: HashMap = config.into(); @@ -329,7 +350,10 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> { } /// Show shard and replicas statistics. -async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { +async fn show_stats(stream: &mut T, pool: &ConnectionPool) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ let columns = vec![ ("database", DataType::Text), ("total_xact_count", DataType::Numeric), diff --git a/src/client.rs b/src/client.rs index c02fcf8..4e8ba73 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, trace}; use std::collections::HashMap; -use tokio::io::{AsyncReadExt, BufReader}; +use tokio::io::{split, AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpStream, @@ -17,15 +17,25 @@ use crate::pool::{get_pool, ClientServerMap}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; use crate::stats::{get_reporter, Reporter}; +use crate::stream::Tls; + +use tokio_rustls::server::TlsStream; + +/// Type of connection received from client. +enum ClientConnectionType { + Startup, + Tls, + CancelQuery, +} /// The client state. One of these is created per client. -pub struct Client { +pub struct Client { /// The reads are buffered (8K by default). - read: BufReader, + read: BufReader, /// We buffer the writes ourselves because we know the protocol /// better than a stock buffer. - write: S, + write: T, /// Internal buffer, where we place messages until we have to flush /// them to the backend. @@ -63,163 +73,347 @@ pub struct Client { last_server_id: Option, } -impl Client { - /// Perform client startup sequence. - /// See docs: - pub async fn startup( - mut stream: TcpStream, +/// Main client loop. +pub async fn client_loop( + mut stream: TcpStream, + client_server_map: ClientServerMap, +) -> Result<(), Error> { + match get_startup::(&mut stream).await { + Ok((ClientConnectionType::Tls, bytes)) => { + match startup_tls(stream, client_server_map).await { + Ok(mut client) => client.handle().await, + Err(err) => Err(err), + } + } + + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); + match Client::handle_startup(read, write, bytes, client_server_map).await { + Ok(mut client) => client.handle().await, + Err(err) => Err(err), + } + } + + Ok((ClientConnectionType::CancelQuery, bytes)) => { + return Err(Error::ProtocolSyncError); + } + + Err(err) => Err(err), + } +} + +async fn get_startup(stream: &mut S) -> Result<(ClientConnectionType, BytesMut), Error> +where + S: tokio::io::AsyncRead + std::marker::Unpin + tokio::io::AsyncWrite, +{ + // Get startup message length. + let len = match stream.read_i32().await { + Ok(len) => len, + Err(_) => return Err(Error::ClientBadStartup), + }; + + // Get the rest of the message. + let mut startup = vec![0u8; len as usize - 4]; + match stream.read_exact(&mut startup).await { + Ok(_) => (), + Err(_) => return Err(Error::ClientBadStartup), + }; + + let mut bytes = BytesMut::from(&startup[..]); + let code = bytes.get_i32(); + + match code { + // Client is requesting SSL (TLS). + SSL_REQUEST_CODE => Ok((ClientConnectionType::Tls, bytes)), + + // Client wants to use plain text, requesting regular startup. + PROTOCOL_VERSION_NUMBER => Ok((ClientConnectionType::Startup, bytes)), + + // Client is requesting to cancel a running query (plain text connection). + CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)), + _ => Err(Error::ProtocolSyncError), + } +} + +/// Handle TLS connection negotation. +pub async fn startup_tls( + mut stream: TcpStream, + client_server_map: ClientServerMap, +) -> Result>, WriteHalf>>, Error> { + // Accept SSL request if SSL is configured. + let mut yes = BytesMut::new(); + yes.put_u8(b'S'); + write_all(&mut stream, yes).await?; + + // Negotiate TLS. + let mut tls = Tls::new().unwrap(); + let mut stream = match tls.acceptor.accept(stream).await { + Ok(stream) => stream, + Err(_) => return Err(Error::TlsError), + }; + + match get_startup::>(&mut stream).await { + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); + Client::handle_startup(read, write, bytes, client_server_map).await + } + _ => Err(Error::ProtocolSyncError), + } +} + +impl Client +where + S: tokio::io::AsyncRead + std::marker::Unpin, + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ + // Perform client startup sequence in TLS. + pub async fn handle_startup( + mut read: S, + mut write: T, + bytes: BytesMut, // The rest of the startup message. client_server_map: ClientServerMap, - ) -> Result, Error> { + ) -> Result, Error> { let config = get_config(); let transaction_mode = config.general.pool_mode == "transaction"; let stats = get_reporter(); - loop { - trace!("Waiting for StartupMessage"); + trace!("Got StartupMessage"); + let parameters = parse_startup(bytes.clone())?; - // Could be StartupMessage, SSLRequest or CancelRequest. - let len = match stream.read_i32().await { - Ok(len) => len, - Err(_) => return Err(Error::ClientBadStartup), - }; + // Generate random backend ID and secret key + let process_id: i32 = rand::random(); + let secret_key: i32 = rand::random(); - let mut startup = vec![0u8; len as usize - 4]; + // Perform MD5 authentication. + // TODO: Add SASL support. + let salt = md5_challenge(&mut write).await?; - match stream.read_exact(&mut startup).await { - Ok(_) => (), - Err(_) => return Err(Error::ClientBadStartup), - }; + let code = match read.read_u8().await { + Ok(p) => p, + Err(_) => return Err(Error::SocketError), + }; - let mut bytes = BytesMut::from(&startup[..]); - let code = bytes.get_i32(); - - match code { - // Client wants SSL. We don't support it at the moment. - SSL_REQUEST_CODE => { - trace!("Rejecting SSLRequest"); - - let mut no = BytesMut::with_capacity(1); - no.put_u8(b'N'); - - write_all(&mut stream, no).await?; - } - - // Regular startup message. - PROTOCOL_VERSION_NUMBER => { - trace!("Got StartupMessage"); - let parameters = parse_startup(bytes.clone())?; - - // Generate random backend ID and secret key - let process_id: i32 = rand::random(); - let secret_key: i32 = rand::random(); - - // Perform MD5 authentication. - // TODO: Add SASL support. - let salt = md5_challenge(&mut stream).await?; - - let code = match stream.read_u8().await { - Ok(p) => p, - Err(_) => return Err(Error::SocketError), - }; - - // PasswordMessage - if code as char != 'p' { - debug!("Expected p, got {}", code as char); - return Err(Error::ProtocolSyncError); - } - - let len = match stream.read_i32().await { - Ok(len) => len, - Err(_) => return Err(Error::SocketError), - }; - - let mut password_response = vec![0u8; (len - 4) as usize]; - - match stream.read_exact(&mut password_response).await { - Ok(_) => (), - Err(_) => return Err(Error::SocketError), - }; - - // Compare server and client hashes. - let password_hash = - md5_hash_password(&config.user.name, &config.user.password, &salt); - - if password_hash != password_response { - debug!("Password authentication failed"); - wrong_password(&mut stream, &config.user.name).await?; - return Err(Error::ClientError); - } - - debug!("Password authentication successful"); - - auth_ok(&mut stream).await?; - write_all(&mut stream, get_pool().server_info()).await?; - backend_key_data(&mut stream, process_id, secret_key).await?; - ready_for_query(&mut stream).await?; - - trace!("Startup OK"); - - let database = parameters - .get("database") - .unwrap_or(parameters.get("user").unwrap()); - let admin = ["pgcat", "pgbouncer"] - .iter() - .filter(|db| *db == &database) - .count() - == 1; - - // Split the read and write streams - // so we can control buffering. - let (read, write) = stream.into_split(); - - return Ok(Client { - read: BufReader::new(read), - write: write, - buffer: BytesMut::with_capacity(8196), - cancel_mode: false, - transaction_mode: transaction_mode, - process_id: process_id, - secret_key: secret_key, - client_server_map: client_server_map, - parameters: parameters, - stats: stats, - admin: admin, - last_address_id: None, - last_server_id: None, - }); - } - - // Query cancel request. - CANCEL_REQUEST_CODE => { - let (read, write) = stream.into_split(); - - let process_id = bytes.get_i32(); - let secret_key = bytes.get_i32(); - - return Ok(Client { - read: BufReader::new(read), - write: write, - buffer: BytesMut::with_capacity(8196), - cancel_mode: true, - transaction_mode: transaction_mode, - process_id: process_id, - secret_key: secret_key, - client_server_map: client_server_map, - parameters: HashMap::new(), - stats: stats, - admin: false, - last_address_id: None, - last_server_id: None, - }); - } - - _ => { - return Err(Error::ProtocolSyncError); - } - }; + // PasswordMessage + if code as char != 'p' { + debug!("Expected p, got {}", code as char); + return Err(Error::ProtocolSyncError); } + + let len = match read.read_i32().await { + Ok(len) => len, + Err(_) => return Err(Error::SocketError), + }; + + let mut password_response = vec![0u8; (len - 4) as usize]; + + match read.read_exact(&mut password_response).await { + Ok(_) => (), + Err(_) => return Err(Error::SocketError), + }; + + // Compare server and client hashes. + let password_hash = md5_hash_password(&config.user.name, &config.user.password, &salt); + + if password_hash != password_response { + debug!("Password authentication failed"); + wrong_password(&mut write, &config.user.name).await?; + return Err(Error::ClientError); + } + + debug!("Password authentication successful"); + + auth_ok(&mut write).await?; + write_all(&mut write, get_pool().server_info()).await?; + backend_key_data(&mut write, process_id, secret_key).await?; + ready_for_query(&mut write).await?; + + trace!("Startup OK"); + + let database = parameters + .get("database") + .unwrap_or(parameters.get("user").unwrap()); + let admin = ["pgcat", "pgbouncer"] + .iter() + .filter(|db| *db == &database) + .count() + == 1; + + // Split the read and write streams + // so we can control buffering. + + return Ok(Client { + read: BufReader::new(read), + write: write, + buffer: BytesMut::with_capacity(8196), + cancel_mode: false, + transaction_mode: transaction_mode, + process_id: process_id, + secret_key: secret_key, + client_server_map: client_server_map, + parameters: parameters, + stats: stats, + admin: admin, + last_address_id: None, + last_server_id: None, + }); } + /// Perform client startup sequence. + /// See docs: + // pub async fn startup( + // mut stream: TcpStream, + // client_server_map: ClientServerMap, + // ) -> Result, WriteHalf>, Error> { + // let config = get_config(); + // let transaction_mode = config.general.pool_mode == "transaction"; + // let stats = get_reporter(); + + // loop { + // trace!("Waiting for StartupMessage"); + + // // Could be StartupMessage, SSLRequest or CancelRequest. + // let len = match stream.read_i32().await { + // Ok(len) => len, + // Err(_) => return Err(Error::ClientBadStartup), + // }; + + // let mut startup = vec![0u8; len as usize - 4]; + + // match stream.read_exact(&mut startup).await { + // Ok(_) => (), + // Err(_) => return Err(Error::ClientBadStartup), + // }; + + // let mut bytes = BytesMut::from(&startup[..]); + // let code = bytes.get_i32(); + + // match code { + // // Client wants SSL. We don't support it at the moment. + // SSL_REQUEST_CODE => { + // trace!("Rejecting SSLRequest"); + + // let mut no = BytesMut::with_capacity(1); + // no.put_u8(b'N'); + + // write_all(&mut stream, no).await?; + // } + + // // Regular startup message. + // PROTOCOL_VERSION_NUMBER => { + // trace!("Got StartupMessage"); + // let parameters = parse_startup(bytes.clone())?; + + // // Generate random backend ID and secret key + // let process_id: i32 = rand::random(); + // let secret_key: i32 = rand::random(); + + // // Perform MD5 authentication. + // // TODO: Add SASL support. + // let salt = md5_challenge(&mut stream).await?; + + // let code = match stream.read_u8().await { + // Ok(p) => p, + // Err(_) => return Err(Error::SocketError), + // }; + + // // PasswordMessage + // if code as char != 'p' { + // debug!("Expected p, got {}", code as char); + // return Err(Error::ProtocolSyncError); + // } + + // let len = match stream.read_i32().await { + // Ok(len) => len, + // Err(_) => return Err(Error::SocketError), + // }; + + // let mut password_response = vec![0u8; (len - 4) as usize]; + + // match stream.read_exact(&mut password_response).await { + // Ok(_) => (), + // Err(_) => return Err(Error::SocketError), + // }; + + // // Compare server and client hashes. + // let password_hash = + // md5_hash_password(&config.user.name, &config.user.password, &salt); + + // if password_hash != password_response { + // debug!("Password authentication failed"); + // wrong_password(&mut stream, &config.user.name).await?; + // return Err(Error::ClientError); + // } + + // debug!("Password authentication successful"); + + // auth_ok(&mut stream).await?; + // write_all(&mut stream, get_pool().server_info()).await?; + // backend_key_data(&mut stream, process_id, secret_key).await?; + // ready_for_query(&mut stream).await?; + + // trace!("Startup OK"); + + // let database = parameters + // .get("database") + // .unwrap_or(parameters.get("user").unwrap()); + // let admin = ["pgcat", "pgbouncer"] + // .iter() + // .filter(|db| *db == &database) + // .count() + // == 1; + + // // Split the read and write streams + // // so we can control buffering. + // let (read, write) = split(stream); + + // return Ok(Client { + // read: BufReader::new(read), + // write: write, + // buffer: BytesMut::with_capacity(8196), + // cancel_mode: false, + // transaction_mode: transaction_mode, + // process_id: process_id, + // secret_key: secret_key, + // client_server_map: client_server_map, + // parameters: parameters, + // stats: stats, + // admin: admin, + // last_address_id: None, + // last_server_id: None, + // }); + // } + + // // Query cancel request. + // CANCEL_REQUEST_CODE => { + // let (read, write) = split(stream); + + // let process_id = bytes.get_i32(); + // let secret_key = bytes.get_i32(); + + // return Ok(Client { + // read: BufReader::new(read), + // write: write, + // buffer: BytesMut::with_capacity(8196), + // cancel_mode: true, + // transaction_mode: transaction_mode, + // process_id: process_id, + // secret_key: secret_key, + // client_server_map: client_server_map, + // parameters: HashMap::new(), + // stats: stats, + // admin: false, + // last_address_id: None, + // last_server_id: None, + // }); + // } + + // _ => { + // return Err(Error::ProtocolSyncError); + // } + // }; + // } + // } + /// Handle a connected and authenticated client. pub async fn handle(&mut self) -> Result<(), Error> { // The client wants to cancel a query it has issued previously. @@ -414,8 +608,8 @@ impl Drop for Client { +impl Drop for Client { fn drop(&mut self) { // Update statistics. if let Some(address_id) = self.last_address_id { diff --git a/src/main.rs b/src/main.rs index 05bfb23..439c01a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,13 +28,13 @@ extern crate log; extern crate md5; extern crate num_cpus; extern crate once_cell; +extern crate rustls_pemfile; extern crate serde; extern crate serde_derive; extern crate sqlparser; extern crate tokio; -extern crate toml; extern crate tokio_rustls; -extern crate rustls_pemfile; +extern crate toml; use log::{debug, error, info}; use parking_lot::Mutex; @@ -45,6 +45,11 @@ use tokio::{ sync::mpsc, }; +use tokio::net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, +}; + use std::collections::HashMap; use std::sync::Arc; @@ -62,6 +67,7 @@ mod sharding; mod stats; mod stream; +use crate::constants::*; use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; use stats::{Collector, Reporter, REPORTER}; @@ -153,32 +159,45 @@ async fn main() { // Handle client. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); - match client::Client::startup(socket, client_server_map).await { - Ok(mut client) => { - info!("Client {:?} connected", addr); - - match client.handle().await { - Ok(()) => { - let duration = chrono::offset::Utc::now().naive_utc() - start; - - info!( - "Client {:?} disconnected, session duration: {}", - addr, - format_duration(&duration) - ); - } - - Err(err) => { - error!("Client disconnected with error: {:?}", err); - client.release(); - } - } - } + // match client::get_startup(&mut socket) { + // Ok((code, bytes)) => match code { + // SSL_REQUEST_CODE => client::Client::tls_startup< + // } + // } + match client::client_loop(socket, client_server_map).await { + Ok(_) => (), Err(err) => { debug!("Client failed to login: {:?}", err); } }; + + // match client::Client::::startup(socket, client_server_map).await { + // Ok(mut client) => { + // info!("Client {:?} connected", addr); + + // match client.handle().await { + // Ok(()) => { + // let duration = chrono::offset::Utc::now().naive_utc() - start; + + // info!( + // "Client {:?} disconnected, session duration: {}", + // addr, + // format_duration(&duration) + // ); + // } + + // Err(err) => { + // error!("Client disconnected with error: {:?}", err); + // client.release(); + // } + // } + // } + + // Err(err) => { + // debug!("Client failed to login: {:?}", err); + // } + // }; }); } }); diff --git a/src/messages.rs b/src/messages.rs index 960a3b6..7b04792 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -31,7 +31,9 @@ impl From<&DataType> for i32 { /// Tell the client that authentication handshake completed successfully. pub async fn auth_ok(stream: &mut S) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut auth_ok = BytesMut::with_capacity(9); auth_ok.put_u8(b'R'); @@ -43,7 +45,9 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { /// Generate md5 password challenge. pub async fn md5_challenge(stream: &mut S) -> Result<[u8; 4], Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ // let mut rng = rand::thread_rng(); let salt: [u8; 4] = [ rand::random(), @@ -69,7 +73,9 @@ pub async fn backend_key_data( backend_id: i32, secret_key: i32, ) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut key_data = BytesMut::from(&b"K"[..]); key_data.put_i32(12); key_data.put_i32(backend_id); @@ -91,7 +97,9 @@ pub fn simple_query(query: &str) -> BytesMut { /// Tell the client we're ready for another query. pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut bytes = BytesMut::with_capacity(5); bytes.put_u8(b'Z'); @@ -215,7 +223,9 @@ pub async fn md5_password( password: &str, salt: &[u8], ) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let password = md5_hash_password(user, password, salt); let mut message = BytesMut::with_capacity(password.len() as usize + 5); @@ -230,11 +240,10 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { /// Implements a response to our custom `SET SHARDING KEY` /// and `SET SERVER ROLE` commands. /// This tells the client we're ready for the next query. -pub async fn custom_protocol_response_ok( - stream: &mut S, - message: &str, -) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +pub async fn custom_protocol_response_ok(stream: &mut S, message: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut res = BytesMut::with_capacity(25); let set_complete = BytesMut::from(&format!("{}\0", message)[..]); @@ -257,7 +266,9 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { /// Tell the client we are ready for the next query and no rollback is necessary. /// Docs on error codes: . pub async fn error_response(stream: &mut S, message: &str) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut error = BytesMut::new(); // Error level @@ -299,7 +310,9 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { } pub async fn wrong_password(stream: &mut S, user: &str) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ let mut error = BytesMut::new(); // Error level @@ -333,11 +346,10 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { } /// Respond to a SHOW SHARD command. -pub async fn show_response( - stream: &mut OwnedWriteHalf, - name: &str, - value: &str, -) -> Result<(), Error> { +pub async fn show_response(stream: &mut S, name: &str, value: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ // A SELECT response consists of: // 1. RowDescription // 2. One or more DataRow @@ -439,7 +451,9 @@ pub fn command_complete(command: &str) -> BytesMut { /// Write all data in the buffer to the TcpStream. pub async fn write_all(stream: &mut S, buf: BytesMut) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), @@ -448,7 +462,9 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { /// Write all the data in the buffer to the TcpStream, write owned half (see mpsc). pub async fn write_all_half(stream: &mut S, buf: BytesMut) -> Result<(), Error> -where S: tokio::io::AsyncWrite + std::marker::Unpin { +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ match stream.write_all(&buf).await { Ok(_) => Ok(()), Err(_) => return Err(Error::SocketError), @@ -456,7 +472,10 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin { } /// Read a complete message from the socket. -pub async fn read_message(stream: &mut BufReader) -> Result { +pub async fn read_message(stream: &mut S) -> Result +where + S: tokio::io::AsyncRead + std::marker::Unpin, +{ let code = match stream.read_u8().await { Ok(code) => code, Err(_) => return Err(Error::SocketError), diff --git a/src/stream.rs b/src/stream.rs index 41d3cd1..9c19b89 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,17 +1,17 @@ // Stream wrapper. use bytes::{Buf, BufMut, BytesMut}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, split, ReadHalf, WriteHalf}; +use rustls_pemfile::{certs, rsa_private_keys}; +use std::path::Path; +use std::sync::Arc; +use tokio::io::{split, AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpStream, }; -use tokio_rustls::server::TlsStream; -use rustls_pemfile::{certs, rsa_private_keys}; use tokio_rustls::rustls::{self, Certificate, PrivateKey}; +use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; -use std::sync::Arc; -use std::path::Path; use crate::config::get_config; use crate::errors::Error; @@ -29,132 +29,91 @@ fn load_keys(path: &std::path::Path) -> std::io::Result> { .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) } -struct Tls { - acceptor: TlsAcceptor, +pub struct Tls { + pub acceptor: TlsAcceptor, } impl Tls { - pub fn new() -> Result { - let config = get_config(); + pub fn new() -> Result { + let config = get_config(); - let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { - Ok(certs) => certs, - Err(_) => return Err(Error::TlsError), - }; + let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { + Ok(certs) => certs, + Err(_) => return Err(Error::TlsError), + }; - let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { - Ok(keys) => keys, - Err(_) => return Err(Error::TlsError), - }; + let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { + Ok(keys) => keys, + Err(_) => return Err(Error::TlsError), + }; - let config = match rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, keys.remove(0)) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) { - Ok(c) => c, - Err(_) => return Err(Error::TlsError) + let config = match rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, keys.remove(0)) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) + { + Ok(c) => c, + Err(_) => return Err(Error::TlsError), }; Ok(Tls { - acceptor: TlsAcceptor::from(Arc::new(config)), + acceptor: TlsAcceptor::from(Arc::new(config)), }) - } + } } struct Stream { - read: Option>, - write: Option, - tls_read: Option>>>, - tls_write: Option>>, + read: Option>, + write: Option, + tls_read: Option>>>, + tls_write: Option>>, } - impl Stream { - pub async fn new(stream: TcpStream, tls: Option) -> Result { + pub async fn new(stream: TcpStream, tls: Option) -> Result { + let config = get_config(); - let config = get_config(); + match tls { + None => { + let (read, write) = stream.into_split(); + let read = BufReader::new(read); + Ok(Self { + read: Some(read), + write: Some(write), + tls_read: None, + tls_write: None, + }) + } - match tls { - None => { - let (read, write) = stream.into_split(); - let read = BufReader::new(read); - Ok( - Self { - read: Some(read), - write: Some(write), - tls_read: None, - tls_write: None, - } - ) - } + Some(tls) => { + let mut tls_stream = match tls.acceptor.accept(stream).await { + Ok(stream) => stream, + Err(_) => return Err(Error::TlsError), + }; - Some(tls) => { - let mut tls_stream = match tls.acceptor.accept(stream).await { - Ok(stream) => stream, - Err(_) => return Err(Error::TlsError), - }; + let (read, write) = split(tls_stream); - let (read, write) = split(tls_stream); + Ok(Self { + read: None, + write: None, + tls_read: Some(BufReader::new(read)), + tls_write: Some(write), + }) + } + } + } +} - Ok(Self{ - read: None, - write: None, - tls_read: Some(BufReader::new(read)), - tls_write: Some(write), - }) - } - } - } - - async fn read(stream: &mut S) -> Result - where S: tokio::io::AsyncRead + std::marker::Unpin { - - let code = match stream.read_u8().await { - Ok(code) => code, - Err(_) => return Err(Error::SocketError), - }; - - let len = match stream.read_i32().await { - Ok(len) => len, - Err(_) => return Err(Error::SocketError), - }; - - let mut buf = vec![0u8; len as usize - 4]; - - match stream.read_exact(&mut buf).await { - Ok(_) => (), - Err(_) => return Err(Error::SocketError), - }; - - let mut bytes = BytesMut::with_capacity(len as usize + 1); - - bytes.put_u8(code); - bytes.put_i32(len); - bytes.put_slice(&buf); - - Ok(bytes) - } - - async fn write(stream: &mut S, buf: &BytesMut) -> Result<(), Error> - where S: tokio::io::AsyncWrite + std::marker::Unpin { - match stream.write_all(buf).await { - Ok(_) => Ok(()), - Err(_) => return Err(Error::SocketError), - } - } - - pub async fn read_message(&mut self) -> Result { - match &self.read { - Some(read) => Self::read(self.read.as_mut().unwrap()).await, - None => Self::read(self.tls_read.as_mut().unwrap()).await, - } - } - - pub async fn write_all(&mut self, buf: &BytesMut) -> Result<(), Error> { - match &self.write { - Some(write) => Self::write(self.write.as_mut().unwrap(), buf).await, - None => Self::write(self.tls_write.as_mut().unwrap(), buf).await, - } - } -} \ No newline at end of file +// impl tokio::io::AsyncRead for Stream { +// fn poll_read( +// mut self: core::pin::Pin<&mut Self>, +// cx: &mut core::task::Context<'_>, +// buf: &mut tokio::io::ReadBuf<'_> +// ) -> core::task::Poll> { +// match &mut self.get_mut().tls_read { +// None => core::pin::Pin::new(self.read.as_mut().unwrap()).poll_read(cx, buf), +// Some(mut tls) => core::pin::Pin::new(&mut tls).poll_read(cx, buf), +// } +// } +// } From 8f3202ed9221856a490cdb36dfbf6e9341ced6ba Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 16:45:41 -0700 Subject: [PATCH 3/9] hmm --- pgcat.toml | 4 +- src/client.rs | 334 ++++++++++++++++++++++-------------------------- src/main.rs | 55 ++------ src/messages.rs | 7 +- src/stream.rs | 119 ----------------- src/tls.rs | 57 +++++++++ 6 files changed, 226 insertions(+), 350 deletions(-) create mode 100644 src/tls.rs diff --git a/pgcat.toml b/pgcat.toml index b2a6b76..e9dbf07 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -33,8 +33,8 @@ ban_time = 60 # Seconds autoreload = false # TLS -tls_certificate = "server.cert" -tls_private_key = "server.key" +# tls_certificate = "server.cert" +# tls_private_key = "server.key" # # User to use for authentication against the server. diff --git a/src/client.rs b/src/client.rs index 4e8ba73..358982f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,9 @@ /// Handle clients by pretending to be a PostgreSQL server. use bytes::{Buf, BufMut, BytesMut}; -use log::{debug, error, trace}; +use log::{debug, error, trace, info}; use std::collections::HashMap; -use tokio::io::{split, AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; +use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; +use tokio::net::TcpStream; use crate::admin::handle_admin; use crate::config::get_config; @@ -17,7 +14,7 @@ use crate::pool::{get_pool, ClientServerMap}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; use crate::stats::{get_reporter, Reporter}; -use crate::stream::Tls; +use crate::tls::Tls; use tokio_rustls::server::TlsStream; @@ -41,6 +38,9 @@ pub struct Client { /// them to the backend. buffer: BytesMut, + /// Address + addr: std::net::SocketAddr, + /// The client was started with the sole reason to cancel another running query. cancel_mode: bool, @@ -73,35 +73,107 @@ pub struct Client { last_server_id: Option, } -/// Main client loop. -pub async fn client_loop( +/// Client entrypoint. +pub async fn client_entrypoint( mut stream: TcpStream, client_server_map: ClientServerMap, ) -> Result<(), Error> { + // Figure out if the client wants TLS or not. + let addr = stream.peer_addr().unwrap(); + match get_startup::(&mut stream).await { - Ok((ClientConnectionType::Tls, bytes)) => { - match startup_tls(stream, client_server_map).await { - Ok(mut client) => client.handle().await, - Err(err) => Err(err), + + // Client requested a TLS connection. + Ok((ClientConnectionType::Tls, _)) => { + let config = get_config(); + + // TLS settings are configured, will setup TLS now. + if config.general.tls_certificate != None { + debug!("Accepting TLS request"); + + let mut yes = BytesMut::new(); + yes.put_u8(b'S'); + write_all(&mut stream, yes).await?; + + // Negotiate TLS. + match startup_tls(stream, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (TLS)", addr); + + client.handle().await + } + Err(err) => Err(err), + } + } + + // TLS is not configured, we cannot offer it. + else { + // Rejecting client request for TLS. + let mut no = BytesMut::new(); + no.put_u8(b'N'); + write_all(&mut stream, no).await?; + + // Attempting regular startup. Client can disconnect now + // if they choose. + match get_startup::(&mut stream).await { + // Client accepted unencrypted connection. + Ok((ClientConnectionType::Startup, bytes)) => { + let (read, write) = split(stream); + + // Continue with regular startup. + match Client::startup(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (plain)", addr); + + client.handle().await + } + Err(err) => Err(err), + } + } + + // Client probably disconnected rejecting our plain text connection. + _ => Err(Error::ProtocolSyncError), + } } } + // Client wants to use plain connection without encryption. Ok((ClientConnectionType::Startup, bytes)) => { let (read, write) = split(stream); - match Client::handle_startup(read, write, bytes, client_server_map).await { - Ok(mut client) => client.handle().await, + + // Continue with regular startup. + match Client::startup(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} connected (plain)", addr); + + client.handle().await + } Err(err) => Err(err), } } + // Client wants to cancel a query. Ok((ClientConnectionType::CancelQuery, bytes)) => { - return Err(Error::ProtocolSyncError); + let (read, write) = split(stream); + + // Continue with cancel query request. + match Client::cancel(read, write, addr, bytes, client_server_map).await { + Ok(mut client) => { + info!("Client {:?} issued a cancel query request", addr); + + client.handle().await + } + + Err(err) => Err(err), + } } + // Something failed, probably the socket. Err(err) => Err(err), } } +/// Handle the first message the client sends. async fn get_startup(stream: &mut S) -> Result<(ClientConnectionType, BytesMut), Error> where S: tokio::io::AsyncRead + std::marker::Unpin + tokio::io::AsyncWrite, @@ -131,32 +203,45 @@ where // Client is requesting to cancel a running query (plain text connection). CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)), + + // Something else, probably something is wrong and it's not our fault, + // e.g. badly implemented Postgres client. _ => Err(Error::ProtocolSyncError), } } /// Handle TLS connection negotation. pub async fn startup_tls( - mut stream: TcpStream, + stream: TcpStream, client_server_map: ClientServerMap, ) -> Result>, WriteHalf>>, Error> { - // Accept SSL request if SSL is configured. - let mut yes = BytesMut::new(); - yes.put_u8(b'S'); - write_all(&mut stream, yes).await?; - // Negotiate TLS. - let mut tls = Tls::new().unwrap(); + let tls = Tls::new()?; + let addr = stream.peer_addr().unwrap(); + let mut stream = match tls.acceptor.accept(stream).await { Ok(stream) => stream, - Err(_) => return Err(Error::TlsError), + + // TLS negotitation failed. + Err(err) => { + error!("TLS negotiation failed: {:?}", err); + return Err(Error::TlsError) + } }; + // TLS negotitation successful. + // Continue with regular startup using encrypted connection. match get_startup::>(&mut stream).await { + + // Got good startup message, proceeding like normal except we + // are encrypted now. Ok((ClientConnectionType::Startup, bytes)) => { let (read, write) = split(stream); - Client::handle_startup(read, write, bytes, client_server_map).await + + Client::startup(read, write, addr, bytes, client_server_map).await } + + // Bad Postgres client. _ => Err(Error::ProtocolSyncError), } } @@ -166,10 +251,12 @@ where S: tokio::io::AsyncRead + std::marker::Unpin, T: tokio::io::AsyncWrite + std::marker::Unpin, { - // Perform client startup sequence in TLS. - pub async fn handle_startup( + /// Handle Postgres client startup after TLS negotiation is complete + /// or over plain text. + pub async fn startup( mut read: S, mut write: T, + addr: std::net::SocketAddr, bytes: BytesMut, // The rest of the startup message. client_server_map: ClientServerMap, ) -> Result, Error> { @@ -244,6 +331,7 @@ where return Ok(Client { read: BufReader::new(read), write: write, + addr, buffer: BytesMut::with_capacity(8196), cancel_mode: false, transaction_mode: transaction_mode, @@ -258,161 +346,38 @@ where }); } - /// Perform client startup sequence. - /// See docs: - // pub async fn startup( - // mut stream: TcpStream, - // client_server_map: ClientServerMap, - // ) -> Result, WriteHalf>, Error> { - // let config = get_config(); - // let transaction_mode = config.general.pool_mode == "transaction"; - // let stats = get_reporter(); + /// Handle cancel request. + pub async fn cancel( + read: S, + write: T, + addr: std::net::SocketAddr, + mut bytes: BytesMut, // The rest of the startup message. + client_server_map: ClientServerMap, + ) -> Result, Error> { + let process_id = bytes.get_i32(); + let secret_key = bytes.get_i32(); - // loop { - // trace!("Waiting for StartupMessage"); + let config = get_config(); + let transaction_mode = config.general.pool_mode == "transaction"; + let stats = get_reporter(); - // // Could be StartupMessage, SSLRequest or CancelRequest. - // let len = match stream.read_i32().await { - // Ok(len) => len, - // Err(_) => return Err(Error::ClientBadStartup), - // }; - - // let mut startup = vec![0u8; len as usize - 4]; - - // match stream.read_exact(&mut startup).await { - // Ok(_) => (), - // Err(_) => return Err(Error::ClientBadStartup), - // }; - - // let mut bytes = BytesMut::from(&startup[..]); - // let code = bytes.get_i32(); - - // match code { - // // Client wants SSL. We don't support it at the moment. - // SSL_REQUEST_CODE => { - // trace!("Rejecting SSLRequest"); - - // let mut no = BytesMut::with_capacity(1); - // no.put_u8(b'N'); - - // write_all(&mut stream, no).await?; - // } - - // // Regular startup message. - // PROTOCOL_VERSION_NUMBER => { - // trace!("Got StartupMessage"); - // let parameters = parse_startup(bytes.clone())?; - - // // Generate random backend ID and secret key - // let process_id: i32 = rand::random(); - // let secret_key: i32 = rand::random(); - - // // Perform MD5 authentication. - // // TODO: Add SASL support. - // let salt = md5_challenge(&mut stream).await?; - - // let code = match stream.read_u8().await { - // Ok(p) => p, - // Err(_) => return Err(Error::SocketError), - // }; - - // // PasswordMessage - // if code as char != 'p' { - // debug!("Expected p, got {}", code as char); - // return Err(Error::ProtocolSyncError); - // } - - // let len = match stream.read_i32().await { - // Ok(len) => len, - // Err(_) => return Err(Error::SocketError), - // }; - - // let mut password_response = vec![0u8; (len - 4) as usize]; - - // match stream.read_exact(&mut password_response).await { - // Ok(_) => (), - // Err(_) => return Err(Error::SocketError), - // }; - - // // Compare server and client hashes. - // let password_hash = - // md5_hash_password(&config.user.name, &config.user.password, &salt); - - // if password_hash != password_response { - // debug!("Password authentication failed"); - // wrong_password(&mut stream, &config.user.name).await?; - // return Err(Error::ClientError); - // } - - // debug!("Password authentication successful"); - - // auth_ok(&mut stream).await?; - // write_all(&mut stream, get_pool().server_info()).await?; - // backend_key_data(&mut stream, process_id, secret_key).await?; - // ready_for_query(&mut stream).await?; - - // trace!("Startup OK"); - - // let database = parameters - // .get("database") - // .unwrap_or(parameters.get("user").unwrap()); - // let admin = ["pgcat", "pgbouncer"] - // .iter() - // .filter(|db| *db == &database) - // .count() - // == 1; - - // // Split the read and write streams - // // so we can control buffering. - // let (read, write) = split(stream); - - // return Ok(Client { - // read: BufReader::new(read), - // write: write, - // buffer: BytesMut::with_capacity(8196), - // cancel_mode: false, - // transaction_mode: transaction_mode, - // process_id: process_id, - // secret_key: secret_key, - // client_server_map: client_server_map, - // parameters: parameters, - // stats: stats, - // admin: admin, - // last_address_id: None, - // last_server_id: None, - // }); - // } - - // // Query cancel request. - // CANCEL_REQUEST_CODE => { - // let (read, write) = split(stream); - - // let process_id = bytes.get_i32(); - // let secret_key = bytes.get_i32(); - - // return Ok(Client { - // read: BufReader::new(read), - // write: write, - // buffer: BytesMut::with_capacity(8196), - // cancel_mode: true, - // transaction_mode: transaction_mode, - // process_id: process_id, - // secret_key: secret_key, - // client_server_map: client_server_map, - // parameters: HashMap::new(), - // stats: stats, - // admin: false, - // last_address_id: None, - // last_server_id: None, - // }); - // } - - // _ => { - // return Err(Error::ProtocolSyncError); - // } - // }; - // } - // } + return Ok(Client { + read: BufReader::new(read), + write: write, + addr, + buffer: BytesMut::with_capacity(8196), + cancel_mode: true, + transaction_mode: transaction_mode, + process_id: process_id, + secret_key: secret_key, + client_server_map: client_server_map, + parameters: HashMap::new(), + stats: stats, + admin: false, + last_address_id: None, + last_server_id: None, + }); + } /// Handle a connected and authenticated client. pub async fn handle(&mut self) -> Result<(), Error> { @@ -608,8 +573,8 @@ where self.last_server_id = Some(server.process_id()); debug!( - "Client stuff talking to server {:?}", - // self.write.peer_addr().unwrap(), + "Client {:?} talking to server {:?}", + self.addr, server.address() ); @@ -846,6 +811,9 @@ where impl Drop for Client { fn drop(&mut self) { + let mut guard = self.client_server_map.lock(); + guard.remove(&(self.process_id, self.secret_key)); + // Update statistics. if let Some(address_id) = self.last_address_id { self.stats.client_disconnecting(self.process_id, address_id); @@ -854,5 +822,7 @@ impl Drop for Client { self.stats.server_idle(process_id, address_id); } } + + // self.release(); } } diff --git a/src/main.rs b/src/main.rs index 439c01a..7fb0ec6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,11 +45,6 @@ use tokio::{ sync::mpsc, }; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; - use std::collections::HashMap; use std::sync::Arc; @@ -65,9 +60,8 @@ mod scram; mod server; mod sharding; mod stats; -mod stream; +mod tls; -use crate::constants::*; use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; use stats::{Collector, Reporter, REPORTER}; @@ -159,45 +153,22 @@ async fn main() { // Handle client. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); - // match client::get_startup(&mut socket) { - // Ok((code, bytes)) => match code { - // SSL_REQUEST_CODE => client::Client::tls_startup< - // } - // } - match client::client_loop(socket, client_server_map).await { - Ok(_) => (), + match client::client_entrypoint(socket, client_server_map).await { + Ok(_) => { + let duration = chrono::offset::Utc::now().naive_utc() - start; + + info!( + "Client {:?} disconnected, session duration: {}", + addr, + format_duration(&duration) + ); + }, + Err(err) => { - debug!("Client failed to login: {:?}", err); + debug!("Client disconnected with error {:?}", err); } }; - - // match client::Client::::startup(socket, client_server_map).await { - // Ok(mut client) => { - // info!("Client {:?} connected", addr); - - // match client.handle().await { - // Ok(()) => { - // let duration = chrono::offset::Utc::now().naive_utc() - start; - - // info!( - // "Client {:?} disconnected, session duration: {}", - // addr, - // format_duration(&duration) - // ); - // } - - // Err(err) => { - // error!("Client disconnected with error: {:?}", err); - // client.release(); - // } - // } - // } - - // Err(err) => { - // debug!("Client failed to login: {:?}", err); - // } - // }; }); } }); diff --git a/src/messages.rs b/src/messages.rs index 7b04792..89795c6 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -2,11 +2,8 @@ /// and handle TcpStream (TCP socket). use bytes::{Buf, BufMut, BytesMut}; use md5::{Digest, Md5}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; use crate::errors::Error; use std::collections::HashMap; diff --git a/src/stream.rs b/src/stream.rs index 9c19b89..e69de29 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,119 +0,0 @@ -// Stream wrapper. - -use bytes::{Buf, BufMut, BytesMut}; -use rustls_pemfile::{certs, rsa_private_keys}; -use std::path::Path; -use std::sync::Arc; -use tokio::io::{split, AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; -use tokio_rustls::rustls::{self, Certificate, PrivateKey}; -use tokio_rustls::server::TlsStream; -use tokio_rustls::TlsAcceptor; - -use crate::config::get_config; -use crate::errors::Error; - -// TLS -fn load_certs(path: &std::path::Path) -> std::io::Result> { - certs(&mut std::io::BufReader::new(std::fs::File::open(path)?)) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert")) - .map(|mut certs| certs.drain(..).map(Certificate).collect()) -} - -fn load_keys(path: &std::path::Path) -> std::io::Result> { - rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?)) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key")) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) -} - -pub struct Tls { - pub acceptor: TlsAcceptor, -} - -impl Tls { - pub fn new() -> Result { - let config = get_config(); - - let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { - Ok(certs) => certs, - Err(_) => return Err(Error::TlsError), - }; - - let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { - Ok(keys) => keys, - Err(_) => return Err(Error::TlsError), - }; - - let config = match rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, keys.remove(0)) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) - { - Ok(c) => c, - Err(_) => return Err(Error::TlsError), - }; - - Ok(Tls { - acceptor: TlsAcceptor::from(Arc::new(config)), - }) - } -} - -struct Stream { - read: Option>, - write: Option, - tls_read: Option>>>, - tls_write: Option>>, -} - -impl Stream { - pub async fn new(stream: TcpStream, tls: Option) -> Result { - let config = get_config(); - - match tls { - None => { - let (read, write) = stream.into_split(); - let read = BufReader::new(read); - Ok(Self { - read: Some(read), - write: Some(write), - tls_read: None, - tls_write: None, - }) - } - - Some(tls) => { - let mut tls_stream = match tls.acceptor.accept(stream).await { - Ok(stream) => stream, - Err(_) => return Err(Error::TlsError), - }; - - let (read, write) = split(tls_stream); - - Ok(Self { - read: None, - write: None, - tls_read: Some(BufReader::new(read)), - tls_write: Some(write), - }) - } - } - } -} - -// impl tokio::io::AsyncRead for Stream { -// fn poll_read( -// mut self: core::pin::Pin<&mut Self>, -// cx: &mut core::task::Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_> -// ) -> core::task::Poll> { -// match &mut self.get_mut().tls_read { -// None => core::pin::Pin::new(self.read.as_mut().unwrap()).poll_read(cx, buf), -// Some(mut tls) => core::pin::Pin::new(&mut tls).poll_read(cx, buf), -// } -// } -// } diff --git a/src/tls.rs b/src/tls.rs new file mode 100644 index 0000000..ab9fc40 --- /dev/null +++ b/src/tls.rs @@ -0,0 +1,57 @@ +// Stream wrapper. + +use rustls_pemfile::{certs, rsa_private_keys}; +use std::path::Path; +use std::sync::Arc; +use tokio_rustls::rustls::{self, Certificate, PrivateKey}; +use tokio_rustls::TlsAcceptor; + +use crate::config::get_config; +use crate::errors::Error; + +// TLS +fn load_certs(path: &Path) -> std::io::Result> { + certs(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert")) + .map(|mut certs| certs.drain(..).map(Certificate).collect()) +} + +fn load_keys(path: &Path) -> std::io::Result> { + rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key")) + .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) +} + +pub struct Tls { + pub acceptor: TlsAcceptor, +} + +impl Tls { + pub fn new() -> Result { + let config = get_config(); + + let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) { + Ok(certs) => certs, + Err(_) => return Err(Error::TlsError), + }; + + let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) { + Ok(keys) => keys, + Err(_) => return Err(Error::TlsError), + }; + + let config = match rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, keys.remove(0)) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) + { + Ok(c) => c, + Err(_) => return Err(Error::TlsError), + }; + + Ok(Tls { + acceptor: TlsAcceptor::from(Arc::new(config)), + }) + } +} From c11d595ac7969697ed396ef60bf24cd28d6d2c05 Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 16:46:03 -0700 Subject: [PATCH 4/9] bye --- src/stream.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/stream.rs diff --git a/src/stream.rs b/src/stream.rs deleted file mode 100644 index e69de29..0000000 From 7667fefeadbb96be11f8af2b813f6984b444285b Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 17:01:14 -0700 Subject: [PATCH 5/9] config check --- src/config.rs | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/tls.rs | 4 ++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 1700541..0e4b8d2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,9 +8,11 @@ use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; use toml; +use std::path::Path; use crate::errors::Error; use crate::{ClientServerMap, ConnectionPool}; +use crate::tls::{load_certs, load_keys}; /// Globally available configuration. static CONFIG: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Config::default())); @@ -253,6 +255,25 @@ impl Config { info!("Primary reads: {}", self.query_router.primary_reads_enabled); info!("Query router: {}", self.query_router.query_parser_enabled); info!("Number of shards: {}", self.shards.len()); + + match self.general.tls_certificate.clone() { + Some(tls_certificate) => { + info!("TLS certificate: {}", tls_certificate); + + match self.general.tls_private_key.clone() { + Some(tls_private_key) => { + info!("TLS private key: {}", tls_private_key); + info!("TLS support is enabled"); + }, + + None => (), + } + } + + None => { + info!("TLS support is disabled"); + }, + }; } } @@ -372,6 +393,39 @@ pub async fn parse(path: &str) -> Result<(), Error> { } }; + // Validate TLS! + match config.general.tls_certificate.clone() { + Some(tls_certificate) => { + match load_certs(&Path::new(&tls_certificate)) { + Ok(_) => { + // Cert is okay, but what about the private key? + match config.general.tls_private_key.clone() { + Some(tls_private_key) => { + match load_keys(&Path::new(&tls_private_key)) { + Ok(_) => (), + Err(err) => { + error!("tls_private_key is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + } + } + + None => { + error!("tls_certificate is set, but the tls_private_key is not"); + return Err(Error::BadConfig); + } + }; + } + + Err(err) => { + error!("tls_certificate is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); + } + } + }, + None => (), + }; + config.path = path.to_string(); // Update the configuration globally. diff --git a/src/tls.rs b/src/tls.rs index ab9fc40..3bc4a6a 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -10,13 +10,13 @@ use crate::config::get_config; use crate::errors::Error; // TLS -fn load_certs(path: &Path) -> std::io::Result> { +pub fn load_certs(path: &Path) -> std::io::Result> { certs(&mut std::io::BufReader::new(std::fs::File::open(path)?)) .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert")) .map(|mut certs| certs.drain(..).map(Certificate).collect()) } -fn load_keys(path: &Path) -> std::io::Result> { +pub fn load_keys(path: &Path) -> std::io::Result> { rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?)) .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key")) .map(|mut keys| keys.drain(..).map(PrivateKey).collect()) From 186f8be5b38a82364c804d6d5eb12c585e91a1d2 Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 17:01:40 -0700 Subject: [PATCH 6/9] lint --- src/client.rs | 7 ++----- src/config.rs | 24 +++++++++++------------- src/main.rs | 2 +- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/client.rs b/src/client.rs index 358982f..05895b6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ /// Handle clients by pretending to be a PostgreSQL server. use bytes::{Buf, BufMut, BytesMut}; -use log::{debug, error, trace, info}; +use log::{debug, error, info, trace}; use std::collections::HashMap; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::TcpStream; @@ -82,7 +82,6 @@ pub async fn client_entrypoint( let addr = stream.peer_addr().unwrap(); match get_startup::(&mut stream).await { - // Client requested a TLS connection. Ok((ClientConnectionType::Tls, _)) => { let config = get_config(); @@ -105,7 +104,6 @@ pub async fn client_entrypoint( Err(err) => Err(err), } } - // TLS is not configured, we cannot offer it. else { // Rejecting client request for TLS. @@ -225,14 +223,13 @@ pub async fn startup_tls( // TLS negotitation failed. Err(err) => { error!("TLS negotiation failed: {:?}", err); - return Err(Error::TlsError) + return Err(Error::TlsError); } }; // TLS negotitation successful. // Continue with regular startup using encrypted connection. match get_startup::>(&mut stream).await { - // Got good startup message, proceeding like normal except we // are encrypted now. Ok((ClientConnectionType::Startup, bytes)) => { diff --git a/src/config.rs b/src/config.rs index 0e4b8d2..da59d2a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,15 +4,15 @@ use log::{error, info}; use once_cell::sync::Lazy; use serde_derive::Deserialize; use std::collections::{HashMap, HashSet}; +use std::path::Path; use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; use toml; -use std::path::Path; use crate::errors::Error; -use crate::{ClientServerMap, ConnectionPool}; use crate::tls::{load_certs, load_keys}; +use crate::{ClientServerMap, ConnectionPool}; /// Globally available configuration. static CONFIG: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Config::default())); @@ -264,7 +264,7 @@ impl Config { Some(tls_private_key) => { info!("TLS private key: {}", tls_private_key); info!("TLS support is enabled"); - }, + } None => (), } @@ -272,7 +272,7 @@ impl Config { None => { info!("TLS support is disabled"); - }, + } }; } } @@ -400,15 +400,13 @@ pub async fn parse(path: &str) -> Result<(), Error> { Ok(_) => { // Cert is okay, but what about the private key? match config.general.tls_private_key.clone() { - Some(tls_private_key) => { - match load_keys(&Path::new(&tls_private_key)) { - Ok(_) => (), - Err(err) => { - error!("tls_private_key is incorrectly configured: {:?}", err); - return Err(Error::BadConfig); - } + Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) { + Ok(_) => (), + Err(err) => { + error!("tls_private_key is incorrectly configured: {:?}", err); + return Err(Error::BadConfig); } - } + }, None => { error!("tls_certificate is set, but the tls_private_key is not"); @@ -422,7 +420,7 @@ pub async fn parse(path: &str) -> Result<(), Error> { return Err(Error::BadConfig); } } - }, + } None => (), }; diff --git a/src/main.rs b/src/main.rs index 7fb0ec6..49795b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -163,7 +163,7 @@ async fn main() { addr, format_duration(&duration) ); - }, + } Err(err) => { debug!("Client disconnected with error {:?}", err); From 21bf07258ca2987960c74730a999be12a4281fce Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 17:05:45 -0700 Subject: [PATCH 7/9] lock em up --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b193e77..3f65e90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,5 +29,5 @@ hmac = "0.12" sha2 = "0.10" base64 = "0.13" stringprep = "0.1" -tokio-rustls = "*" -rustls-pemfile = "*" +tokio-rustls = "0.23" +rustls-pemfile = "1" From 773602dedf90bb1e84324aa3e31934767b125ae3 Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 17:06:49 -0700 Subject: [PATCH 8/9] Im about to get a nasty email --- .circleci/pgcat.toml | 3 +++ .circleci/server.cert | 21 +++++++++++++++++++++ .circleci/server.key | 28 ++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) create mode 100644 .circleci/server.cert create mode 100644 .circleci/server.key diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index d7249f1..943eff9 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -32,6 +32,9 @@ ban_time = 60 # Seconds # autoreload = true +tls_certificate = ".circleci/server.cert" +tls_private_key = ".circleci/server.key" + # # User to use for authentication against the server. [user] diff --git a/.circleci/server.cert b/.circleci/server.cert new file mode 100644 index 0000000..a24847a --- /dev/null +++ b/.circleci/server.cert @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUChIvUGFJGJe5EDch32rchqoxER0wDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMjA2MjcyMjI2MDZaFw0yMjA3 +MjcyMjI2MDZaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDdTwrBzV1v79faVckFvIn/9V4fypYs4vDi3X+h3wGn +AjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzSioOv5zsOAvObwrnzbtKSwfs3aP5g +eEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwvGn3A+SVCLyPMTNwnem48+ONh2F9u +FHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHYM5Hx9tzc+NVYdERPtaVcX8ycw1Eh +9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe72loreiwrECUOLAnAfp9Xqc+rMPP +aLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE9nQvksjnAgMBAAGjUzBRMB0GA1Ud +DgQWBBQLDtzexqjx7xPtUZuZB/angU9oSDAfBgNVHSMEGDAWgBQLDtzexqjx7xPt +UZuZB/angU9oSDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQC/ +mxY/a/WeLENVj2Gg9EUH0CKzfqeTey1mb6YfPGxzrD7oq1m0Vn2MmTbjZrJgh/Ob +QckO3ElF4kC9+6XP+iDPmabGpjeLgllBboT5l2aqnD1syMrf61WPLzgRzRfplYGy +cjBQDDKPu8Lu0QRMWU28tHYN0bMxJoCuXysGGX5WsuFnKCA6f/V+nycJJXxJH3eB +eLjTueD9/RE3OXhi6m8A29Q1E9AE5EF4uRxYXrr91BmYnk4aFvSmBxhUEzE12eSN +lHB/uSc0+Dp+UVmVr6wW8AQfd16UBA0BUf3kSW3aSvirYPYH0rXiOOpEJgOwOMnR +f5+XAbN1Y+3OsFz/ZmP9 +-----END CERTIFICATE----- diff --git a/.circleci/server.key b/.circleci/server.key new file mode 100644 index 0000000..14e4fd6 --- /dev/null +++ b/.circleci/server.key @@ -0,0 +1,28 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDdTwrBzV1v79fa +VckFvIn/9V4fypYs4vDi3X+h3wGnAjEh6mmizlKCwSwAam07D9Q5zKiXFrzNJqzS +ioOv5zsOAvObwrnzbtKSwfs3aP5geEh2clHCZYx9p06WszPcgSB5nTz1NeY4XAwv +Gn3A+SVCLyPMTNwnem48+ONh2F9uFHtSuIsEVvTjMlH09O7LjwJlODxy3HNv2JHY +M5Hx9tzc+NVYdERPtaVcX8ycw1Eh9hgGSgfaNM52/JfRMIDhENrsn0S1omRUtcJe +72loreiwrECUOLAnAfp9Xqc+rMPPaLA6ElzmYef1+ZEC0p6isCHPhxY5ESVhKYhE +9nQvksjnAgMBAAECggEAbnvddO9frFhivJ+DIhgEFQKcIOb0nigV9kx6QYehvYy8 +lp/+aMb0Lk7d9r8rFQdL/icMK5GwZALg2KNKJvEbbF1Q3PwT9VHoUlgBYKJMDEFA +e9GKu7ASuVBjTZzdUUItwkkbe5eS/aQGeSWSjlpTnX0HNCFS72qRymK+scRhsAQf +ZoHyZHDslkvPR3Pos+sndWBYCDHag5/KoPhsMt1+5S9NQcOUHx9Ac0gLHjau3N+P +0FhODHFFGnnpyQvLvj6u3ZOR34ladMgoBglE0O3vPFhckn92EK4teeTWOsUMotiz +qM3QIJTOJjtiY6VDGY93bIa4pFvt7Zi4vIerenKt0QKBgQD/UMFqfevTAMrk10AC +bOa4+cM07ORY4ZwVj5ILhZn+8crDEEtBsUyuEU2FTINtnoEq1yGc/IXpsyS1BHjL +L1xSml5LN3jInbi8z5XQfY5Sj3VOMtwY6yD20jcdeDC44rz3nStXdkcMWxbTMapx +iOPsap5ciUKOMS7LyMidPEG/LQKBgQDd5vHgrLN0FBIIm+vZg6MEm4QyobstVp4l +7V/GZsdL+M8AQv1Rx+5wSUSWKomOIv5lglis7f6g0c9O7Qkr78/wzoyoKC2RRqPp +I90GjY2Iv22N4GIkRrDAgMZbkTitzIB6tbXEVeLAOh3frFJ8IwauRCOiXIjrZdJ4 +FvV86+nU4wKBgQDdWTP2kWkMrBk7QOp7r9Jv+AmnLuHhtOdPQgOJ/bA++X2ik9PL +Bl3GY7XjpSwks1CkxZKcucmXjPp7/X6EGXFfI/owF82dkDADca0e7lufdERtIWb0 +K5WOpz2lTPhgsiLGQfq7fw2lxqsJOnvcpqOD6gOVkmKjSDyb7F0RBJazmQKBgQDD +a8PQTcesjpBjLI3EfX1vbVY7ENu6zfFxDV+vZoxVh8UlQdm90AlYse3JIaUKnB7W +Xrihcucv0hZ0N6RAIW5LcFvHK7sVmdR4WbEpODhRGeTtcZJ8yBSZM898jKQRy2vK +pYRyaADNsWDlvujVkjMr/a40KrIaPQ3h3LZNUaYYaQKBgQD1x8A5S5SiE1cN1vFr +aACkmA2WqEDKKhUsUigJdwW6WB/B9kWlIlz/iV1H9uwBXtSIYG4VqCSTAvh0z4gX +Qu2SrdPm5PYnKzpdynpz78OnGdflD1RKWFGHItR6GN6tj/VmulO6mlFvT4jzBQ7j ++Hf8m2TcD4U3ksz3xw+YOD+cmA== +-----END RSA PRIVATE KEY----- From 8bcfbed57438b2e2f280241e3c2e01c06907696e Mon Sep 17 00:00:00 2001 From: Lev Date: Mon, 27 Jun 2022 17:07:40 -0700 Subject: [PATCH 9/9] forgotten comment --- src/admin.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/admin.rs b/src/admin.rs index 622c14a..74acf15 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -2,7 +2,6 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; -// use tokio::net::tcp::T; use crate::config::{get_config, reload_config}; use crate::errors::Error;