Files
pgcat/src/server.rs

485 lines
17 KiB
Rust
Raw Normal View History

2022-02-04 09:28:52 -08:00
///! Implementation of the PostgreSQL server (database) protocol.
///! Here we are pretending to the a Postgres client.
2022-02-03 15:17:04 -08:00
use bytes::{Buf, BufMut, BytesMut};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};
2022-02-03 15:17:04 -08:00
use crate::config::{Address, User};
use crate::constants::*;
2022-02-03 15:17:04 -08:00
use crate::errors::Error;
use crate::messages::*;
2022-02-14 10:00:55 -08:00
use crate::stats::Reporter;
2022-02-04 16:01:35 -08:00
use crate::ClientServerMap;
2022-02-03 15:17:04 -08:00
2022-02-04 09:28:52 -08:00
/// Server state.
2022-02-03 15:17:04 -08:00
pub struct Server {
// Server host, e.g. localhost,
// port, e.g. 5432, and role, e.g. primary or replica.
address: Address,
2022-02-08 09:25:59 -08:00
// Buffered read socket.
2022-02-03 15:17:04 -08:00
read: BufReader<OwnedReadHalf>,
2022-02-08 09:25:59 -08:00
// Unbuffered write socket (our client code buffers).
2022-02-03 15:17:04 -08:00
write: OwnedWriteHalf,
2022-02-08 09:25:59 -08:00
// Our server response buffer. We buffer data before we give it to the client.
2022-02-03 15:17:04 -08:00
buffer: BytesMut,
2022-02-08 09:25:59 -08:00
// Server information the server sent us over on startup.
2022-02-03 17:48:37 -08:00
server_info: BytesMut,
2022-02-08 09:25:59 -08:00
// Backend id and secret key used for query cancellation.
2022-02-03 17:48:37 -08:00
backend_id: i32,
secret_key: i32,
2022-02-08 09:25:59 -08:00
// Is the server inside a transaction or idle.
2022-02-03 16:25:05 -08:00
in_transaction: bool,
2022-02-08 09:25:59 -08:00
// Is there more data for the client to read.
2022-02-04 08:26:50 -08:00
data_available: bool,
2022-02-08 09:25:59 -08:00
// Is the server broken? We'll remote it from the pool if so.
bad: bool,
2022-02-08 09:25:59 -08:00
// Mapping of clients and servers used for query cancellation.
2022-02-04 16:01:35 -08:00
client_server_map: ClientServerMap,
2022-02-09 20:02:20 -08:00
// Server connected at.
connected_at: chrono::naive::NaiveDateTime,
2022-02-14 10:00:55 -08:00
// Reports various metrics, e.g. data sent & received.
2022-02-14 10:00:55 -08:00
stats: Reporter,
2022-02-03 15:17:04 -08:00
}
impl Server {
2022-02-04 09:28:52 -08:00
/// Pretend to be the Postgres client and connect to the server given host, port and credentials.
/// Perform the authentication and return the server in a ready for query state.
2022-02-03 15:17:04 -08:00
pub async fn startup(
address: &Address,
user: &User,
2022-02-03 15:17:04 -08:00
database: &str,
2022-02-04 16:01:35 -08:00
client_server_map: ClientServerMap,
2022-02-14 10:00:55 -08:00
stats: Reporter,
2022-02-03 15:17:04 -08:00
) -> Result<Server, Error> {
let mut stream =
match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await {
Ok(stream) => stream,
Err(err) => {
println!(">> Could not connect to server: {}", err);
return Err(Error::SocketError);
}
};
2022-02-03 15:17:04 -08:00
// Send the startup packet telling the server we're a normal Postgres client.
startup(&mut stream, &user.name, database).await?;
2022-02-03 15:17:04 -08:00
let mut server_info = BytesMut::new();
2022-02-03 17:48:37 -08:00
let mut backend_id: i32 = 0;
let mut secret_key: i32 = 0;
// We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete.
2022-02-03 15:17:04 -08:00
loop {
let code = match stream.read_u8().await {
Ok(code) => code as char,
Err(_) => return Err(Error::SocketError),
};
let len = match stream.read_i32().await {
Ok(len) => len,
Err(_) => return Err(Error::SocketError),
};
match code {
// Authentication
2022-02-03 15:17:04 -08:00
'R' => {
// Determine which kind of authentication is required, if any.
let auth_code = match stream.read_i32().await {
Ok(auth_code) => auth_code,
2022-02-03 15:17:04 -08:00
Err(_) => return Err(Error::SocketError),
};
match auth_code {
MD5_ENCRYPTED_PASSWORD => {
// The salt is 4 bytes.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html
2022-02-03 15:17:04 -08:00
let mut salt = vec![0u8; 4];
match stream.read_exact(&mut salt).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
md5_password(&mut stream, &user.name, &user.password, &salt[..])
.await?;
2022-02-03 15:17:04 -08:00
}
AUTHENTICATION_SUCCESSFUL => (),
2022-02-03 15:17:04 -08:00
_ => {
println!(">> Unsupported authentication mechanism: {}", auth_code);
2022-02-03 15:17:04 -08:00
return Err(Error::ServerError);
}
}
}
// ErrorResponse
2022-02-03 15:17:04 -08:00
'E' => {
2022-02-07 11:15:33 -08:00
let error_code = match stream.read_u8().await {
Ok(error_code) => error_code,
Err(_) => return Err(Error::SocketError),
};
match error_code {
// No error message is present in the message.
MESSAGE_TERMINATOR => (),
// An error message will be present.
2022-02-07 11:15:33 -08:00
_ => {
// Read the error message without the terminating null character.
2022-02-07 11:15:33 -08:00
let mut error = vec![0u8; len as usize - 4 - 1];
2022-02-07 11:15:33 -08:00
match stream.read_exact(&mut error).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
// TODO: the error message contains multiple fields; we can decode them and
// present a prettier message to the user.
// See: https://www.postgresql.org/docs/12/protocol-error-fields.html
2022-02-07 11:15:33 -08:00
println!(">> Server error: {}", String::from_utf8_lossy(&error));
}
};
2022-02-03 15:17:04 -08:00
return Err(Error::ServerError);
}
// ParameterStatus
2022-02-03 15:17:04 -08:00
'S' => {
let mut param = vec![0u8; len as usize - 4];
2022-02-03 15:17:04 -08:00
match stream.read_exact(&mut param).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
2022-02-03 17:48:37 -08:00
// Save the parameter so we can pass it to the client later.
// These can be server_encoding, client_encoding, server timezone, Postgres version,
// and many more interesting things we should know about the Postgres server we are talking to.
2022-02-03 17:48:37 -08:00
server_info.put_u8(b'S');
server_info.put_i32(len);
server_info.put_slice(&param[..]);
2022-02-03 15:17:04 -08:00
}
// BackendKeyData
2022-02-03 15:17:04 -08:00
'K' => {
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html
2022-02-03 17:48:37 -08:00
backend_id = match stream.read_i32().await {
Ok(id) => id,
Err(_) => return Err(Error::SocketError),
2022-02-03 17:48:37 -08:00
};
secret_key = match stream.read_i32().await {
Ok(id) => id,
Err(_) => return Err(Error::SocketError),
2022-02-03 15:17:04 -08:00
};
}
// ReadyForQuery
2022-02-03 15:17:04 -08:00
'Z' => {
let mut idle = vec![0u8; len as usize - 4];
match stream.read_exact(&mut idle).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};
// This is the last step in the client-server connection setup,
// and indicates the server is ready for to query it.
2022-02-03 15:17:04 -08:00
let (read, write) = stream.into_split();
return Ok(Server {
address: address.clone(),
2022-02-03 15:17:04 -08:00
read: BufReader::new(read),
write: write,
buffer: BytesMut::with_capacity(8196),
2022-02-03 17:48:37 -08:00
server_info: server_info,
backend_id: backend_id,
secret_key: secret_key,
2022-02-03 16:25:05 -08:00
in_transaction: false,
2022-02-04 08:26:50 -08:00
data_available: false,
bad: false,
2022-02-04 16:01:35 -08:00
client_server_map: client_server_map,
connected_at: chrono::offset::Utc::now().naive_utc(),
2022-02-14 10:00:55 -08:00
stats: stats,
2022-02-03 15:17:04 -08:00
});
}
// We have an unexpected message from the server during this exchange.
// Means we implemented the protocol wrong or we're not talking to a Postgres server.
2022-02-03 15:17:04 -08:00
_ => {
println!(">> Unknown code: {}", code);
return Err(Error::ProtocolSyncError);
}
};
}
}
/// Issue a query cancellation request to the server.
2022-02-04 16:01:35 -08:00
/// Uses a separate connection that's not part of the connection pool.
pub async fn cancel(
host: &str,
port: &str,
process_id: i32,
secret_key: i32,
) -> Result<(), Error> {
let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await {
Ok(stream) => stream,
Err(err) => {
println!(">> Could not connect to server: {}", err);
return Err(Error::SocketError);
}
};
let mut bytes = BytesMut::with_capacity(16);
bytes.put_i32(16);
bytes.put_i32(CANCEL_REQUEST_CODE);
2022-02-04 16:01:35 -08:00
bytes.put_i32(process_id);
bytes.put_i32(secret_key);
Ok(write_all(&mut stream, bytes).await?)
}
/// Send messages to the server from the client.
2022-02-03 15:17:04 -08:00
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
2022-02-14 10:00:55 -08:00
self.stats.data_sent(messages.len());
match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()),
Err(err) => {
println!(">> Terminating server because of: {:?}", err);
self.bad = true;
Err(err)
}
}
2022-02-03 15:17:04 -08:00
}
/// Receive data from the server in response to a client request.
2022-02-04 09:28:52 -08:00
/// This method must be called multiple times while `self.is_data_available()` is true
/// in order to receive all data the server has to offer.
2022-02-03 15:17:04 -08:00
pub async fn recv(&mut self) -> Result<BytesMut, Error> {
loop {
let mut message = match read_message(&mut self.read).await {
Ok(message) => message,
Err(err) => {
println!(">> Terminating server because of: {:?}", err);
self.bad = true;
return Err(err);
}
};
2022-02-03 15:17:04 -08:00
// Buffer the message we'll forward to the client later.
2022-02-03 15:17:04 -08:00
self.buffer.put(&message[..]);
let code = message.get_u8() as char;
2022-02-03 16:25:05 -08:00
let _len = message.get_i32();
2022-02-03 15:33:26 -08:00
2022-02-03 15:17:04 -08:00
match code {
// ReadyForQuery
2022-02-03 15:17:04 -08:00
'Z' => {
2022-02-03 16:25:05 -08:00
let transaction_state = message.get_u8() as char;
2022-02-03 16:25:05 -08:00
match transaction_state {
// In transaction.
2022-02-03 16:25:05 -08:00
'T' => {
self.in_transaction = true;
}
2022-02-03 16:25:05 -08:00
// Idle, transaction over.
2022-02-03 16:25:05 -08:00
'I' => {
self.in_transaction = false;
}
// Some error occured, the transaction was rolled back.
'E' => {
2022-02-03 18:13:36 -08:00
self.in_transaction = true;
}
2022-02-03 16:25:05 -08:00
// Something totally unexpected, this is not a Postgres server we know.
2022-02-03 16:25:05 -08:00
_ => {
self.bad = true;
return Err(Error::ProtocolSyncError);
}
2022-02-03 16:25:05 -08:00
};
// There is no more data available from the server.
2022-02-05 14:38:41 -08:00
self.data_available = false;
2022-02-03 15:17:04 -08:00
break;
}
2022-02-03 15:17:04 -08:00
// DataRow
2022-02-05 14:38:41 -08:00
'D' => {
// More data is available after this message, this is not the end of the reply.
2022-02-05 14:38:41 -08:00
self.data_available = true;
// Don't flush yet, the more we buffer, the faster this goes...
// up to a limit of course.
2022-02-05 14:38:41 -08:00
if self.buffer.len() >= 8196 {
break;
}
}
// CopyInResponse: copy is starting from client to server.
2022-02-04 08:06:45 -08:00
'G' => break,
// CopyOutResponse: copy is starting from the server to the client.
2022-02-04 08:26:50 -08:00
'H' => {
self.data_available = true;
break;
}
// CopyData: we are not buffering this one because there will be many more
// and we don't know how big this packet could be, best not to take a risk.
2022-02-04 08:26:50 -08:00
'd' => break,
// CopyDone
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
'c' => (),
2022-02-04 08:26:50 -08:00
// Anything else, e.g. errors, notices, etc.
// Keep buffering until ReadyForQuery shows up.
_ => (),
2022-02-03 15:17:04 -08:00
};
}
let bytes = self.buffer.clone();
// Keep track of how much data we got from the server for stats.
2022-02-14 10:00:55 -08:00
self.stats.data_received(bytes.len());
// Clear the buffer for next query.
2022-02-03 15:17:04 -08:00
self.buffer.clear();
// Pass the data back to the client.
2022-02-03 15:17:04 -08:00
Ok(bytes)
}
2022-02-03 16:25:05 -08:00
2022-02-04 09:28:52 -08:00
/// If the server is still inside a transaction.
/// If the client disconnects while the server is in a transaction, we will clean it up.
2022-02-03 16:25:05 -08:00
pub fn in_transaction(&self) -> bool {
self.in_transaction
}
2022-02-04 09:28:52 -08:00
/// We don't buffer all of server responses, e.g. COPY OUT produces too much data.
/// The client is responsible to call `self.recv()` while this method returns true.
2022-02-04 08:26:50 -08:00
pub fn is_data_available(&self) -> bool {
self.data_available
}
2022-02-04 09:28:52 -08:00
/// Server & client are out of sync, we must discard this connection.
/// This happens with clients that misbehave.
pub fn is_bad(&self) -> bool {
self.bad
}
2022-02-04 09:28:52 -08:00
/// Get server startup information to forward it to the client.
/// Not used at the moment.
2022-02-03 17:48:37 -08:00
pub fn server_info(&self) -> BytesMut {
self.server_info.clone()
}
2022-02-04 09:28:52 -08:00
/// Indicate that this server connection cannot be re-used and must be discarded.
pub fn mark_bad(&mut self) {
println!(">> Server marked bad");
self.bad = true;
}
2022-02-04 16:08:18 -08:00
/// Claim this server as mine for the purposes of query cancellation.
2022-02-04 16:01:35 -08:00
pub fn claim(&mut self, process_id: i32, secret_key: i32) {
let mut guard = self.client_server_map.lock().unwrap();
2022-02-05 10:02:13 -08:00
guard.insert(
(process_id, secret_key),
(
self.backend_id,
self.secret_key,
self.address.host.clone(),
self.address.port.clone(),
2022-02-05 10:02:13 -08:00
),
);
2022-02-04 16:01:35 -08:00
}
2022-02-04 09:28:52 -08:00
/// Execute an arbitrary query against the server.
/// It will use the simple query protocol.
2022-02-04 09:28:52 -08:00
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
2022-02-03 17:32:04 -08:00
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
let mut query = BytesMut::from(&query.as_bytes()[..]);
query.put_u8(0); // C-string terminator (NULL character).
let len = query.len() as i32 + 4;
let mut msg = BytesMut::with_capacity(len as usize + 1);
msg.put_u8(b'Q');
msg.put_i32(len);
msg.put_slice(&query[..]);
self.send(msg).await?;
2022-02-05 14:38:41 -08:00
loop {
let _ = self.recv().await?;
2022-02-05 14:38:41 -08:00
if !self.data_available {
break;
}
}
Ok(())
}
2022-02-03 17:32:04 -08:00
2022-02-04 09:28:52 -08:00
/// A shorthand for `SET application_name = $1`.
#[allow(dead_code)]
2022-02-03 17:32:04 -08:00
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?)
}
2022-02-05 13:15:53 -08:00
/// Get the servers address.
#[allow(dead_code)]
2022-02-05 13:15:53 -08:00
pub fn address(&self) -> Address {
self.address.clone()
2022-02-05 13:15:53 -08:00
}
2022-02-03 15:17:04 -08:00
}
impl Drop for Server {
/// Try to do a clean shut down. Best effort because
/// the socket is in non-blocking mode, so it may not be ready
/// for a write.
fn drop(&mut self) {
let mut bytes = BytesMut::with_capacity(4);
bytes.put_u8(b'X');
bytes.put_i32(4);
match self.write.try_write(&bytes) {
Ok(_) => (),
Err(_) => (),
};
self.bad = true;
let now = chrono::offset::Utc::now().naive_utc();
let duration = now - self.connected_at;
println!(
">> Server connection closed, session duration: {}",
crate::format_duration(&duration)
);
}
}