Server TLS (#417)

* Server TLS

* Finish up TLS

* thats it

* diff

* remove dead code

* maybe?

* dirty shutdown

* skip flakey test

* remove unused error

* fetch config once
This commit is contained in:
Lev Kokotov
2023-04-30 09:41:46 -07:00
committed by GitHub
parent 4a87b4807d
commit 0d504032b2
10 changed files with 311 additions and 33 deletions

View File

@@ -9,13 +9,12 @@ use std::collections::HashMap;
use std::io::Read;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
use tokio::net::TcpStream;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{Address, User};
use crate::config::{get_config, Address, User};
use crate::constants::*;
use crate::errors::{Error, ServerIdentifier};
use crate::messages::*;
@@ -23,6 +22,84 @@ use crate::mirrors::MirroringManager;
use crate::pool::ClientServerMap;
use crate::scram::ScramSha256;
use crate::stats::ServerStats;
use std::io::Write;
use pin_project::pin_project;
#[pin_project(project = SteamInnerProj)]
pub enum StreamInner {
Plain {
#[pin]
stream: TcpStream,
},
Tls {
#[pin]
stream: TlsStream<TcpStream>,
},
}
impl AsyncWrite for StreamInner {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.project();
match this {
SteamInnerProj::Tls { stream } => stream.poll_write(cx, buf),
SteamInnerProj::Plain { stream } => stream.poll_write(cx, buf),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
let this = self.project();
match this {
SteamInnerProj::Tls { stream } => stream.poll_flush(cx),
SteamInnerProj::Plain { stream } => stream.poll_flush(cx),
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
let this = self.project();
match this {
SteamInnerProj::Tls { stream } => stream.poll_shutdown(cx),
SteamInnerProj::Plain { stream } => stream.poll_shutdown(cx),
}
}
}
impl AsyncRead for StreamInner {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
match this {
SteamInnerProj::Tls { stream } => stream.poll_read(cx, buf),
SteamInnerProj::Plain { stream } => stream.poll_read(cx, buf),
}
}
}
impl StreamInner {
pub fn try_write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self {
StreamInner::Tls { stream } => {
let r = stream.get_mut();
let mut w = r.1.writer();
w.write(buf)
}
StreamInner::Plain { stream } => stream.try_write(buf),
}
}
}
/// Server state.
pub struct Server {
@@ -30,11 +107,8 @@ pub struct Server {
/// port, e.g. 5432, and role, e.g. primary or replica.
address: Address,
/// Buffered read socket.
read: BufReader<OwnedReadHalf>,
/// Unbuffered write socket (our client code buffers).
write: OwnedWriteHalf,
/// Server TCP connection.
stream: BufStream<StreamInner>,
/// Our server response buffer. We buffer data before we give it to the client.
buffer: BytesMut,
@@ -98,8 +172,88 @@ impl Server {
)));
}
};
// TCP timeouts.
configure_socket(&stream);
let config = get_config();
let mut stream = if config.general.server_tls {
// Request a TLS connection
ssl_request(&mut stream).await?;
let response = match stream.read_u8().await {
Ok(response) => response as char,
Err(err) => {
return Err(Error::SocketError(format!(
"Server socket error: {:?}",
err
)))
}
};
match response {
// Server supports TLS
'S' => {
debug!("Connecting to server using TLS");
let mut root_store = RootCertStore::empty();
root_store.add_server_trust_anchors(
webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
)
}),
);
let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store)
.with_no_client_auth();
// Equivalent to sslmode=prefer which is fine most places.
// If you want verify-full, change `verify_server_certificate` to true.
if !config.general.verify_server_certificate {
let mut dangerous = tls_config.dangerous();
dangerous.set_certificate_verifier(Arc::new(
crate::tls::NoCertificateVerification {},
));
}
let connector = TlsConnector::from(Arc::new(tls_config));
let stream = match connector
.connect(address.host.as_str().try_into().unwrap(), stream)
.await
{
Ok(stream) => stream,
Err(err) => {
return Err(Error::SocketError(format!("Server TLS error: {:?}", err)))
}
};
StreamInner::Tls { stream }
}
// Server does not support TLS
'N' => StreamInner::Plain { stream },
// Something else?
m => {
return Err(Error::SocketError(format!(
"Unknown message: {}",
m as char
)));
}
}
} else {
StreamInner::Plain { stream }
};
// let (read, write) = split(stream);
// let (mut read, mut write) = (ReadInner::Plain { stream: read }, WriteInner::Plain { stream: write });
trace!("Sending StartupMessage");
// StartupMessage
@@ -245,7 +399,7 @@ impl Server {
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
if sasl_type == SCRAM_SHA_256 {
if sasl_type.contains(SCRAM_SHA_256) {
debug!("Using {}", SCRAM_SHA_256);
// Generate client message.
@@ -268,7 +422,7 @@ impl Server {
res.put_i32(sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
write_all_flush(&mut stream, &res).await?;
} else {
error!("Unsupported SCRAM version: {}", sasl_type);
return Err(Error::ServerError);
@@ -299,7 +453,7 @@ impl Server {
res.put_i32(4 + sasl_response.len() as i32);
res.put(sasl_response);
write_all(&mut stream, res).await?;
write_all_flush(&mut stream, &res).await?;
}
SASL_FINAL => {
@@ -443,12 +597,9 @@ impl Server {
}
};
let (read, write) = stream.into_split();
let mut server = Server {
address: address.clone(),
read: BufReader::new(read),
write,
stream: BufStream::new(stream),
buffer: BytesMut::with_capacity(8196),
server_info,
process_id,
@@ -515,7 +666,7 @@ impl Server {
bytes.put_i32(process_id);
bytes.put_i32(secret_key);
write_all(&mut stream, bytes).await
write_all_flush(&mut stream, &bytes).await
}
/// Send messages to the server from the client.
@@ -523,7 +674,7 @@ impl Server {
self.mirror_send(messages);
self.stats().data_sent(messages.len());
match write_all_half(&mut self.write, messages).await {
match write_all_flush(&mut self.stream, &messages).await {
Ok(_) => {
// Successfully sent to server
self.last_activity = SystemTime::now();
@@ -542,7 +693,7 @@ impl Server {
/// in order to receive all data the server has to offer.
pub async fn recv(&mut self) -> Result<BytesMut, Error> {
loop {
let mut message = match read_message(&mut self.read).await {
let mut message = match read_message(&mut self.stream).await {
Ok(message) => message,
Err(err) => {
error!("Terminating server because of: {:?}", err);
@@ -935,13 +1086,13 @@ impl Drop for Server {
// Update statistics
self.stats.disconnect();
let mut bytes = BytesMut::with_capacity(4);
let mut bytes = BytesMut::with_capacity(5);
bytes.put_u8(b'X');
bytes.put_i32(4);
match self.write.try_write(&bytes) {
Ok(_) => (),
Err(_) => debug!("Dirty shutdown"),
match self.stream.get_mut().try_write(&bytes) {
Ok(5) => (),
_ => debug!("Dirty shutdown"),
};
// Should not matter.