mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-24 17:56:29 +00:00
Finish up TLS
This commit is contained in:
161
src/server.rs
161
src/server.rs
@@ -10,14 +10,11 @@ use std::io::Read;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tokio::io::{split, AsyncRead, AsyncReadExt, AsyncWrite, BufReader, ReadHalf, WriteHalf};
|
||||
use tokio::net::{
|
||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||
TcpStream,
|
||||
};
|
||||
use tokio_rustls::rustls::ClientConfig;
|
||||
use tokio_rustls::{TlsConnector, TlsStream};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
|
||||
use tokio_rustls::{client::TlsStream, TlsConnector};
|
||||
|
||||
use crate::config::{Address, User};
|
||||
use crate::config::{get_config, Address, User};
|
||||
use crate::constants::*;
|
||||
use crate::errors::{Error, ServerIdentifier};
|
||||
use crate::messages::*;
|
||||
@@ -176,33 +173,97 @@ impl Server {
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
// TCP timeouts.
|
||||
configure_socket(&stream);
|
||||
|
||||
// ssl_request(&mut stream).await?;
|
||||
// let response = match stream.read_u8().await {
|
||||
// Ok(response) => response as char,
|
||||
// Err(err) => return Err(Error::SocketError(format!("Server socket error: {:?}", err))),
|
||||
// };
|
||||
let (mut read, mut write) = if get_config().general.server_tls {
|
||||
// Request a TLS connection
|
||||
ssl_request(&mut stream).await?;
|
||||
|
||||
// match response {
|
||||
// 'S' => {
|
||||
// let connector = TlsConnector::from(ClientConfig::builder()
|
||||
// .with_safe_default_cipher_suites()
|
||||
// .with_safe_default_kx_groups()
|
||||
// .with_safe_default_protocol_versions()
|
||||
// .unwrap()
|
||||
// .with_no_client_auth());
|
||||
// connector.connect("test".into(), stream).await.unwrap();
|
||||
// },
|
||||
let response = match stream.read_u8().await {
|
||||
Ok(response) => response as char,
|
||||
Err(err) => {
|
||||
return Err(Error::SocketError(format!(
|
||||
"Server socket error: {:?}",
|
||||
err
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// 'N' => {
|
||||
match response {
|
||||
// Server supports TLS
|
||||
'S' => {
|
||||
let mut root_store = RootCertStore::empty();
|
||||
root_store.add_server_trust_anchors(
|
||||
webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
|
||||
OwnedTrustAnchor::from_subject_spki_name_constraints(
|
||||
ta.subject,
|
||||
ta.spki,
|
||||
ta.name_constraints,
|
||||
)
|
||||
}),
|
||||
);
|
||||
|
||||
// },
|
||||
let mut config = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
|
||||
// _ => {
|
||||
// return Err(Error::SocketError("error".into()));
|
||||
// }
|
||||
// };
|
||||
// Equivalent to sslmode=prefer which is fine most places.
|
||||
// If you want verify-full, change `verify_server_certificate` to true.
|
||||
if !get_config().general.verify_server_certificate {
|
||||
let mut dangerous = config.dangerous();
|
||||
dangerous.set_certificate_verifier(Arc::new(
|
||||
crate::tls::NoCertificateVerification {},
|
||||
));
|
||||
}
|
||||
|
||||
let connector = TlsConnector::from(Arc::new(config));
|
||||
let stream = match connector
|
||||
.connect(address.host.as_str().try_into().unwrap(), stream)
|
||||
.await
|
||||
{
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
return Err(Error::SocketError(format!("Server TLS error: {:?}", err)))
|
||||
}
|
||||
};
|
||||
|
||||
let (read, write) = split(stream);
|
||||
(
|
||||
ReadInner::Tls { stream: read },
|
||||
WriteInner::Tls { stream: write },
|
||||
)
|
||||
}
|
||||
|
||||
// Server does not support TLS
|
||||
'N' => {
|
||||
let (read, write) = split(stream);
|
||||
(
|
||||
ReadInner::Plain { stream: read },
|
||||
WriteInner::Plain { stream: write },
|
||||
)
|
||||
}
|
||||
|
||||
// Something else?
|
||||
m => {
|
||||
return Err(Error::SocketError(format!(
|
||||
"Unknown message: {}",
|
||||
m as char
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let (read, write) = split(stream);
|
||||
(
|
||||
ReadInner::Plain { stream: read },
|
||||
WriteInner::Plain { stream: write },
|
||||
)
|
||||
};
|
||||
|
||||
// let (read, write) = split(stream);
|
||||
// let (mut read, mut write) = (ReadInner::Plain { stream: read }, WriteInner::Plain { stream: write });
|
||||
|
||||
trace!("Sending StartupMessage");
|
||||
|
||||
@@ -220,7 +281,7 @@ impl Server {
|
||||
},
|
||||
};
|
||||
|
||||
startup(&mut stream, username, database).await?;
|
||||
startup(&mut write, username, database).await?;
|
||||
|
||||
let mut server_info = BytesMut::new();
|
||||
let mut process_id: i32 = 0;
|
||||
@@ -235,7 +296,7 @@ impl Server {
|
||||
};
|
||||
|
||||
loop {
|
||||
let code = match stream.read_u8().await {
|
||||
let code = match read.read_u8().await {
|
||||
Ok(code) => code as char,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -245,7 +306,7 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
let len = match stream.read_i32().await {
|
||||
let len = match read.read_i32().await {
|
||||
Ok(len) => len,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -261,7 +322,7 @@ impl Server {
|
||||
// Authentication
|
||||
'R' => {
|
||||
// Determine which kind of authentication is required, if any.
|
||||
let auth_code = match stream.read_i32().await {
|
||||
let auth_code = match read.read_i32().await {
|
||||
Ok(auth_code) => auth_code,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -279,7 +340,7 @@ impl Server {
|
||||
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html
|
||||
let mut salt = vec![0u8; 4];
|
||||
|
||||
match stream.read_exact(&mut salt).await {
|
||||
match read.read_exact(&mut salt).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -292,7 +353,7 @@ impl Server {
|
||||
match password {
|
||||
// Using plaintext password
|
||||
Some(password) => {
|
||||
md5_password(&mut stream, username, password, &salt[..]).await?
|
||||
md5_password(&mut write, username, password, &salt[..]).await?
|
||||
}
|
||||
|
||||
// Using auth passthrough, in this case we should already have a
|
||||
@@ -303,7 +364,7 @@ impl Server {
|
||||
match option_hash {
|
||||
Some(hash) =>
|
||||
md5_password_with_hash(
|
||||
&mut stream,
|
||||
&mut write,
|
||||
&hash,
|
||||
&salt[..],
|
||||
)
|
||||
@@ -337,7 +398,7 @@ impl Server {
|
||||
let sasl_len = (len - 8) as usize;
|
||||
let mut sasl_auth = vec![0u8; sasl_len];
|
||||
|
||||
match stream.read_exact(&mut sasl_auth).await {
|
||||
match read.read_exact(&mut sasl_auth).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -349,7 +410,7 @@ impl Server {
|
||||
|
||||
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
|
||||
|
||||
if sasl_type == SCRAM_SHA_256 {
|
||||
if sasl_type.contains(SCRAM_SHA_256) {
|
||||
debug!("Using {}", SCRAM_SHA_256);
|
||||
|
||||
// Generate client message.
|
||||
@@ -372,7 +433,7 @@ impl Server {
|
||||
res.put_i32(sasl_response.len() as i32);
|
||||
res.put(sasl_response);
|
||||
|
||||
write_all(&mut stream, res).await?;
|
||||
write_all(&mut write, res).await?;
|
||||
} else {
|
||||
error!("Unsupported SCRAM version: {}", sasl_type);
|
||||
return Err(Error::ServerError);
|
||||
@@ -384,7 +445,7 @@ impl Server {
|
||||
|
||||
let mut sasl_data = vec![0u8; (len - 8) as usize];
|
||||
|
||||
match stream.read_exact(&mut sasl_data).await {
|
||||
match read.read_exact(&mut sasl_data).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -403,14 +464,14 @@ impl Server {
|
||||
res.put_i32(4 + sasl_response.len() as i32);
|
||||
res.put(sasl_response);
|
||||
|
||||
write_all(&mut stream, res).await?;
|
||||
write_all(&mut write, res).await?;
|
||||
}
|
||||
|
||||
SASL_FINAL => {
|
||||
trace!("Final SASL");
|
||||
|
||||
let mut sasl_final = vec![0u8; len as usize - 8];
|
||||
match stream.read_exact(&mut sasl_final).await {
|
||||
match read.read_exact(&mut sasl_final).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -445,7 +506,7 @@ impl Server {
|
||||
|
||||
// ErrorResponse
|
||||
'E' => {
|
||||
let error_code = match stream.read_u8().await {
|
||||
let error_code = match read.read_u8().await {
|
||||
Ok(error_code) => error_code,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -466,7 +527,7 @@ impl Server {
|
||||
// Read the error message without the terminating null character.
|
||||
let mut error = vec![0u8; len as usize - 4 - 1];
|
||||
|
||||
match stream.read_exact(&mut error).await {
|
||||
match read.read_exact(&mut error).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -490,7 +551,7 @@ impl Server {
|
||||
'S' => {
|
||||
let mut param = vec![0u8; len as usize - 4];
|
||||
|
||||
match stream.read_exact(&mut param).await {
|
||||
match read.read_exact(&mut param).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -512,7 +573,7 @@ impl Server {
|
||||
'K' => {
|
||||
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
|
||||
// See: <https://www.postgresql.org/docs/12/protocol-message-formats.html>.
|
||||
process_id = match stream.read_i32().await {
|
||||
process_id = match read.read_i32().await {
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -522,7 +583,7 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
secret_key = match stream.read_i32().await {
|
||||
secret_key = match read.read_i32().await {
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -537,7 +598,7 @@ impl Server {
|
||||
'Z' => {
|
||||
let mut idle = vec![0u8; len as usize - 4];
|
||||
|
||||
match stream.read_exact(&mut idle).await {
|
||||
match read.read_exact(&mut idle).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
return Err(Error::ServerStartupError(
|
||||
@@ -547,12 +608,10 @@ impl Server {
|
||||
}
|
||||
};
|
||||
|
||||
let (read, write) = split(stream);
|
||||
|
||||
let mut server = Server {
|
||||
address: address.clone(),
|
||||
read: BufReader::new(ReadInner::Plain { stream: read }),
|
||||
write: WriteInner::Plain { stream: write },
|
||||
read: BufReader::new(read),
|
||||
write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
server_info,
|
||||
process_id,
|
||||
|
||||
Reference in New Issue
Block a user