mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 10:06:28 +00:00
check
This commit is contained in:
@@ -19,13 +19,13 @@ use crate::server::Server;
|
||||
use crate::stats::{get_reporter, Reporter};
|
||||
|
||||
/// The client state. One of these is created per client.
|
||||
pub struct Client {
|
||||
pub struct Client <T, S> {
|
||||
/// The reads are buffered (8K by default).
|
||||
read: BufReader<OwnedReadHalf>,
|
||||
read: BufReader<T>,
|
||||
|
||||
/// We buffer the writes ourselves because we know the protocol
|
||||
/// better than a stock buffer.
|
||||
write: OwnedWriteHalf,
|
||||
write: S,
|
||||
|
||||
/// Internal buffer, where we place messages until we have to flush
|
||||
/// them to the backend.
|
||||
@@ -63,13 +63,13 @@ pub struct Client {
|
||||
last_server_id: Option<i32>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
impl<T: tokio::io::AsyncRead + std::marker::Unpin, S: tokio::io::AsyncWrite + std::marker::Unpin> Client <T, S> {
|
||||
/// Perform client startup sequence.
|
||||
/// See docs: <https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.3>
|
||||
pub async fn startup(
|
||||
mut stream: TcpStream,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<Client, Error> {
|
||||
) -> Result<Client<T, S>, Error> {
|
||||
let config = get_config();
|
||||
let transaction_mode = config.general.pool_mode == "transaction";
|
||||
let stats = get_reporter();
|
||||
@@ -650,7 +650,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Client {
|
||||
impl<T, S> Drop for Client <T, S> {
|
||||
fn drop(&mut self) {
|
||||
// Update statistics.
|
||||
if let Some(address_id) = self.last_address_id {
|
||||
|
||||
@@ -111,6 +111,8 @@ pub struct General {
|
||||
pub healthcheck_timeout: u64,
|
||||
pub ban_time: i64,
|
||||
pub autoreload: bool,
|
||||
pub tls_certificate: Option<String>,
|
||||
pub tls_private_key: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for General {
|
||||
@@ -124,6 +126,8 @@ impl Default for General {
|
||||
healthcheck_timeout: 1000,
|
||||
ban_time: 60,
|
||||
autoreload: false,
|
||||
tls_certificate: None,
|
||||
tls_private_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,4 +10,5 @@ pub enum Error {
|
||||
BadConfig,
|
||||
AllServersDown,
|
||||
ClientError,
|
||||
TlsError,
|
||||
}
|
||||
|
||||
@@ -33,6 +33,8 @@ extern crate serde_derive;
|
||||
extern crate sqlparser;
|
||||
extern crate tokio;
|
||||
extern crate toml;
|
||||
extern crate tokio_rustls;
|
||||
extern crate rustls_pemfile;
|
||||
|
||||
use log::{debug, error, info};
|
||||
use parking_lot::Mutex;
|
||||
@@ -58,6 +60,7 @@ mod scram;
|
||||
mod server;
|
||||
mod sharding;
|
||||
mod stats;
|
||||
mod stream;
|
||||
|
||||
use config::{get_config, reload_config};
|
||||
use pool::{ClientServerMap, ConnectionPool};
|
||||
|
||||
@@ -30,7 +30,8 @@ impl From<&DataType> for i32 {
|
||||
}
|
||||
|
||||
/// Tell the client that authentication handshake completed successfully.
|
||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
pub async fn auth_ok<S>(stream: &mut S) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut auth_ok = BytesMut::with_capacity(9);
|
||||
|
||||
auth_ok.put_u8(b'R');
|
||||
@@ -41,7 +42,8 @@ pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
/// Generate md5 password challenge.
|
||||
pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> {
|
||||
pub async fn md5_challenge<S>(stream: &mut S) -> Result<[u8; 4], Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
// let mut rng = rand::thread_rng();
|
||||
let salt: [u8; 4] = [
|
||||
rand::random(),
|
||||
@@ -62,11 +64,12 @@ pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> {
|
||||
|
||||
/// Give the client the process_id and secret we generated
|
||||
/// used in query cancellation.
|
||||
pub async fn backend_key_data(
|
||||
stream: &mut TcpStream,
|
||||
pub async fn backend_key_data<S>(
|
||||
stream: &mut S,
|
||||
backend_id: i32,
|
||||
secret_key: i32,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut key_data = BytesMut::from(&b"K"[..]);
|
||||
key_data.put_i32(12);
|
||||
key_data.put_i32(backend_id);
|
||||
@@ -87,7 +90,8 @@ pub fn simple_query(query: &str) -> BytesMut {
|
||||
}
|
||||
|
||||
/// Tell the client we're ready for another query.
|
||||
pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> {
|
||||
pub async fn ready_for_query<S>(stream: &mut S) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut bytes = BytesMut::with_capacity(5);
|
||||
|
||||
bytes.put_u8(b'Z');
|
||||
@@ -205,12 +209,13 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
|
||||
|
||||
/// Send password challenge response to the server.
|
||||
/// This is the MD5 challenge.
|
||||
pub async fn md5_password(
|
||||
stream: &mut TcpStream,
|
||||
pub async fn md5_password<S>(
|
||||
stream: &mut S,
|
||||
user: &str,
|
||||
password: &str,
|
||||
salt: &[u8],
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let password = md5_hash_password(user, password, salt);
|
||||
|
||||
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
|
||||
@@ -225,10 +230,11 @@ pub async fn md5_password(
|
||||
/// Implements a response to our custom `SET SHARDING KEY`
|
||||
/// and `SET SERVER ROLE` commands.
|
||||
/// This tells the client we're ready for the next query.
|
||||
pub async fn custom_protocol_response_ok(
|
||||
stream: &mut OwnedWriteHalf,
|
||||
pub async fn custom_protocol_response_ok<S>(
|
||||
stream: &mut S,
|
||||
message: &str,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut res = BytesMut::with_capacity(25);
|
||||
|
||||
let set_complete = BytesMut::from(&format!("{}\0", message)[..]);
|
||||
@@ -250,7 +256,8 @@ pub async fn custom_protocol_response_ok(
|
||||
/// Send a custom error message to the client.
|
||||
/// Tell the client we are ready for the next query and no rollback is necessary.
|
||||
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
|
||||
pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Result<(), Error> {
|
||||
pub async fn error_response<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut error = BytesMut::new();
|
||||
|
||||
// Error level
|
||||
@@ -291,7 +298,8 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul
|
||||
Ok(write_all_half(stream, res).await?)
|
||||
}
|
||||
|
||||
pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Error> {
|
||||
pub async fn wrong_password<S>(stream: &mut S, user: &str) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
let mut error = BytesMut::new();
|
||||
|
||||
// Error level
|
||||
@@ -430,7 +438,8 @@ pub fn command_complete(command: &str) -> BytesMut {
|
||||
}
|
||||
|
||||
/// Write all data in the buffer to the TcpStream.
|
||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
match stream.write_all(&buf).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
@@ -438,7 +447,8 @@ pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Erro
|
||||
}
|
||||
|
||||
/// Write all the data in the buffer to the TcpStream, write owned half (see mpsc).
|
||||
pub async fn write_all_half(stream: &mut OwnedWriteHalf, buf: BytesMut) -> Result<(), Error> {
|
||||
pub async fn write_all_half<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
match stream.write_all(&buf).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
|
||||
160
src/stream.rs
Normal file
160
src/stream.rs
Normal file
@@ -0,0 +1,160 @@
|
||||
// Stream wrapper.
|
||||
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, split, ReadHalf, WriteHalf};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
TcpStream,
|
||||
};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use rustls_pemfile::{certs, rsa_private_keys};
|
||||
use tokio_rustls::rustls::{self, Certificate, PrivateKey};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use std::sync::Arc;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::config::get_config;
|
||||
use crate::errors::Error;
|
||||
|
||||
// TLS
|
||||
fn load_certs(path: &std::path::Path) -> std::io::Result<Vec<Certificate>> {
|
||||
certs(&mut std::io::BufReader::new(std::fs::File::open(path)?))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))
|
||||
.map(|mut certs| certs.drain(..).map(Certificate).collect())
|
||||
}
|
||||
|
||||
fn load_keys(path: &std::path::Path) -> std::io::Result<Vec<PrivateKey>> {
|
||||
rsa_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid key"))
|
||||
.map(|mut keys| keys.drain(..).map(PrivateKey).collect())
|
||||
}
|
||||
|
||||
struct Tls {
|
||||
acceptor: TlsAcceptor,
|
||||
}
|
||||
|
||||
impl Tls {
|
||||
pub fn new() -> Result<Self, Error> {
|
||||
let config = get_config();
|
||||
|
||||
let certs = match load_certs(&Path::new(&config.general.tls_certificate.unwrap())) {
|
||||
Ok(certs) => certs,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
let mut keys = match load_keys(&Path::new(&config.general.tls_private_key.unwrap())) {
|
||||
Ok(keys) => keys,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
let config = match rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, keys.remove(0))
|
||||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err)) {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(Error::TlsError)
|
||||
};
|
||||
|
||||
Ok(Tls {
|
||||
acceptor: TlsAcceptor::from(Arc::new(config)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct Stream {
|
||||
read: Option<BufReader<OwnedReadHalf>>,
|
||||
write: Option<OwnedWriteHalf>,
|
||||
tls_read: Option<BufReader<ReadHalf<TlsStream<TcpStream>>>>,
|
||||
tls_write: Option<WriteHalf<TlsStream<TcpStream>>>,
|
||||
}
|
||||
|
||||
|
||||
impl Stream {
|
||||
pub async fn new(stream: TcpStream, tls: Option<Tls>) -> Result<Stream, Error> {
|
||||
|
||||
let config = get_config();
|
||||
|
||||
match tls {
|
||||
None => {
|
||||
let (read, write) = stream.into_split();
|
||||
let read = BufReader::new(read);
|
||||
Ok(
|
||||
Self {
|
||||
read: Some(read),
|
||||
write: Some(write),
|
||||
tls_read: None,
|
||||
tls_write: None,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
Some(tls) => {
|
||||
let mut tls_stream = match tls.acceptor.accept(stream).await {
|
||||
Ok(stream) => stream,
|
||||
Err(_) => return Err(Error::TlsError),
|
||||
};
|
||||
|
||||
let (read, write) = split(tls_stream);
|
||||
|
||||
Ok(Self{
|
||||
read: None,
|
||||
write: None,
|
||||
tls_read: Some(BufReader::new(read)),
|
||||
tls_write: Some(write),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read<S>(stream: &mut S) -> Result<BytesMut, Error>
|
||||
where S: tokio::io::AsyncRead + std::marker::Unpin {
|
||||
|
||||
let code = match stream.read_u8().await {
|
||||
Ok(code) => code,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let len = match stream.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let mut buf = vec![0u8; len as usize - 4];
|
||||
|
||||
match stream.read_exact(&mut buf).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
};
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(len as usize + 1);
|
||||
|
||||
bytes.put_u8(code);
|
||||
bytes.put_i32(len);
|
||||
bytes.put_slice(&buf);
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
async fn write<S>(stream: &mut S, buf: &BytesMut) -> Result<(), Error>
|
||||
where S: tokio::io::AsyncWrite + std::marker::Unpin {
|
||||
match stream.write_all(buf).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => return Err(Error::SocketError),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_message(&mut self) -> Result<BytesMut, Error> {
|
||||
match &self.read {
|
||||
Some(read) => Self::read(self.read.as_mut().unwrap()).await,
|
||||
None => Self::read(self.tls_read.as_mut().unwrap()).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn write_all(&mut self, buf: &BytesMut) -> Result<(), Error> {
|
||||
match &self.write {
|
||||
Some(write) => Self::write(self.write.as_mut().unwrap(), buf).await,
|
||||
None => Self::write(self.tls_write.as_mut().unwrap(), buf).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user