From 8e88c47f76f21c1ef3681c44b7944b247cf452a6 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 4 Feb 2022 09:28:52 -0800 Subject: [PATCH] started query cancellation --- README.md | 10 +++++++++ src/client.rs | 58 +++++++++++++++++++++++++++++++++++++++++++------ src/main.rs | 5 +++++ src/messages.rs | 13 +++++++++++ src/server.rs | 22 +++++++++++++++++++ 5 files changed, 101 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 4a4d025..b8ac733 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,16 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su 1. Install Rust (latest stable is fine). 2. `cargo run --release` (to get better benchmarks). +### Tests + +You can just PgBench to test your changes: + +``` +pgbench -i -h 127.0.0.1 -p 5433 && \ +pgbench -t 1000 -p 5433 -h 127.0.0.1 --protocol simple && \ +pgbench -t 1000 -p 5433 -h 127.0.0.1 --protocol extended +``` + ## Features 1. Session mode. diff --git a/src/client.rs b/src/client.rs index d530a4b..5f0a2dc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,26 +1,32 @@ +/// Implementation of the PostgreSQL client. +/// We are pretending to the server in this scenario, +/// and this module implements that. +use bb8::Pool; +use bytes::{Buf, BufMut, BytesMut}; +use rand::{distributions::Alphanumeric, Rng}; use tokio::io::{AsyncReadExt, BufReader, Interest}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; -/// PostgreSQL client (frontend). -/// We are pretending to be the backend. use tokio::net::TcpStream; -use bytes::{Buf, BufMut, BytesMut}; - use crate::errors::Error; use crate::messages::*; - use crate::pool::ServerPool; -use bb8::Pool; -use rand::{distributions::Alphanumeric, Rng}; +/// The client state. pub struct Client { read: BufReader, write: OwnedWriteHalf, buffer: BytesMut, name: String, + cancel_mode: bool, + process_id: i32, + secret_key: i32, } impl Client { + /// Given a TCP socket, trick the client into thinking we are + /// the Postgres server. Perform the authentication and place + /// the client in query-ready mode. pub async fn startup(mut stream: TcpStream) -> Result { loop { // Could be StartupMessage or SSLRequest @@ -54,8 +60,14 @@ impl Client { 196608 => { // TODO: perform actual auth. // TODO: record startup parameters client sends over. + + // Generate random backend ID and secret key + let process_id: i32 = rand::random(); + let secret_key: i32 = rand::random(); + auth_ok(&mut stream).await?; server_parameters(&mut stream).await?; + backend_key_data(&mut stream, process_id, secret_key).await?; ready_for_query(&mut stream).await?; let (read, write) = stream.into_split(); @@ -71,6 +83,27 @@ impl Client { write: write, buffer: BytesMut::with_capacity(8196), name: name, + cancel_mode: false, + process_id: process_id, + secret_key: secret_key, + }); + } + + // Cancel request + 80877102 => { + let (read, write) = stream.into_split(); + + let process_id = bytes.get_i32(); + let secret_key = bytes.get_i32(); + + return Ok(Client { + read: BufReader::new(read), + write: write, + buffer: BytesMut::with_capacity(8196), + name: String::from("cancel_mode"), + cancel_mode: true, + process_id: process_id, + secret_key: secret_key, }); } @@ -81,7 +114,18 @@ impl Client { } } + /// Client loop. We handle all messages between the client and the database here. pub async fn handle(&mut self, pool: Pool) -> Result<(), Error> { + // Special: cancelling existing running query + if self.cancel_mode { + // TODO: Implement this + println!( + ">> Query cancellation requested: {}, {}", + self.process_id, self.secret_key + ); + return Ok(()); + } + loop { // Only grab a connection once we have some traffic on the socket // TODO: this is not the most optimal way to share servers. diff --git a/src/main.rs b/src/main.rs index 1a0ad67..58aa2b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,12 +7,17 @@ extern crate tokio; use bb8::Pool; use tokio::net::TcpListener; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + mod client; mod errors; mod messages; mod pool; mod server; +type ClientServerMap = Arc>>; + #[tokio::main] async fn main() { println!("> Welcome to PgRabbit"); diff --git a/src/messages.rs b/src/messages.rs index 2ba4b37..7c07669 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -28,6 +28,19 @@ pub async fn server_parameters(stream: &mut TcpStream) -> Result<(), Error> { Ok(write_all(stream, res).await?) } +pub async fn backend_key_data( + stream: &mut TcpStream, + backend_id: i32, + secret_key: i32, +) -> Result<(), Error> { + let mut key_data = BytesMut::from(&b"K"[..]); + key_data.put_i32(12); + key_data.put_i32(backend_id); + key_data.put_i32(secret_key); + + Ok(write_all(stream, key_data).await?) +} + pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> { let mut bytes = BytesMut::with_capacity(5); diff --git a/src/server.rs b/src/server.rs index b83bbb7..2c11efc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,8 @@ #![allow(dead_code)] #![allow(unused_variables)] +///! Implementation of the PostgreSQL server (database) protocol. +///! Here we are pretending to the a Postgres client. use bytes::{Buf, BufMut, BytesMut}; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; @@ -9,6 +11,7 @@ use tokio::net::TcpStream; use crate::errors::Error; use crate::messages::*; +/// Server state. pub struct Server { read: BufReader, write: OwnedWriteHalf, @@ -22,6 +25,8 @@ pub struct Server { } impl Server { + /// Pretend to be the Postgres client and connect to the server given host, port and credentials. + /// Perform the authentication and return the server in a ready-for-query mode. pub async fn startup( host: &str, port: &str, @@ -150,6 +155,7 @@ impl Server { } } + /// Send data to the server from the client. pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { match write_all_half(&mut self.write, messages).await { Ok(_) => Ok(()), @@ -161,6 +167,9 @@ impl Server { } } + /// Receive data from the server in response to a client request sent previously. + /// This method must be called multiple times while `self.is_data_available()` is true + /// in order to receive all data the server has to offer. pub async fn recv(&mut self) -> Result { loop { let mut message = match read_message(&mut self.read).await { @@ -237,27 +246,39 @@ impl Server { Ok(bytes) } + /// If the server is still inside a transaction. + /// If the client disconnects while the server is in a transaction, we will clean it up. pub fn in_transaction(&self) -> bool { self.in_transaction } + /// We don't buffer all of server responses, e.g. COPY OUT produces too much data. + /// The client is responsible to call `self.recv()` while this method returns true. pub fn is_data_available(&self) -> bool { self.data_available } + /// Server & client are out of sync, we must discard this connection. + /// This happens with clients that misbehave. pub fn is_bad(&self) -> bool { self.bad } + /// Get server startup information to forward it to the client. + /// Not used at the moment. pub fn server_info(&self) -> BytesMut { self.server_info.clone() } + /// Indicate that this server connection cannot be re-used and must be discarded. pub fn mark_bad(&mut self) { println!(">> Server marked bad"); self.bad = true; } + /// Execute an arbitrary query against the server. + /// It will use the Simple query protocol. + /// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`. pub async fn query(&mut self, query: &str) -> Result<(), Error> { let mut query = BytesMut::from(&query.as_bytes()[..]); query.put_u8(0); @@ -276,6 +297,7 @@ impl Server { Ok(()) } + /// A shorthand for `SET application_name = $1`. pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { Ok(self .query(&format!("SET application_name = '{}'", name))