diff --git a/src/client.rs b/src/client.rs index 5b0e218..5502a3e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,16 +12,32 @@ use crate::messages::*; use crate::pool::{ClientServerMap, ConnectionPool}; use crate::server::Server; -/// The client state. +/// The client state. One of these is created per client. pub struct Client { + // The reads are buffered (8K by default). read: BufReader, + + // We buffer the writes ourselves because we know the protocol + // better than a stock buffer. write: OwnedWriteHalf, + + // Internal buffer, where we place messages until we have to flush + // them to the backend. buffer: BytesMut, - name: String, + + // The client was started with the sole reason to cancel another running query. cancel_mode: bool, + + // In transaction mode, the connection is released after each transaction. + // Session mode has slightly higher throughput per client, but lower capacity. transaction_mode: bool, + + // For query cancellation, the client is given a random process ID and secret on startup. process_id: i32, secret_key: i32, + + // Clients are mapped to servers while they use them. This allows a client + // to connect and cancel a query. client_server_map: ClientServerMap, } @@ -75,19 +91,14 @@ impl Client { backend_key_data(&mut stream, process_id, secret_key).await?; ready_for_query(&mut stream).await?; + // Split the read and write streams + // so we can control buffering. let (read, write) = stream.into_split(); - let name: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - return Ok(Client { read: BufReader::new(read), write: write, buffer: BytesMut::with_capacity(8196), - name: name, cancel_mode: false, transaction_mode: true, process_id: process_id, @@ -96,7 +107,7 @@ impl Client { }); } - // Cancel request + // Query cancel request. 80877102 => { let (read, write) = stream.into_split(); @@ -107,7 +118,6 @@ impl Client { read: BufReader::new(read), write: write, buffer: BytesMut::with_capacity(8196), - name: String::from("cancel_mode"), cancel_mode: true, transaction_mode: true, process_id: process_id, @@ -130,6 +140,7 @@ impl Client { let (process_id, secret_key, address, port) = { let guard = self.client_server_map.lock().unwrap(); match guard.get(&(self.process_id, self.secret_key)) { + // Drop the mutex as soon as possible. Some((process_id, secret_key, address, port)) => ( process_id.clone(), secret_key.clone(), @@ -145,51 +156,52 @@ impl Client { } loop { - // Only grab a connection once we have some traffic on the socket - // TODO: this is not the most optimal way to share servers. - // let mut peek_buf = vec![0u8; 2]; + // Read a complete message from the client, which normally would be + // either a `Q` (query) or `P` (prepare, extended protocol). + // We can parse it here before grabbing a server from the pool, + // in case the client is sending some control messages, e.g. + // SET sharding_context.key = '1234'; + let mut message = read_message(&mut self.read).await?; - // match self.read.get_mut().peek(&mut peek_buf).await { - // Ok(_) => (), - // Err(_) => return Err(Error::ClientDisconnected), - // }; - let message = read_message(&mut self.read).await?; + // TODO: parse the message here. If it's part of our protocol, + // don't grab a server yet and continue loop. - self.buffer.put(message); + // The message is part of the regular protocol. + // self.buffer.put(message); - let mut proxy = pool.get(None).await.unwrap().0; + // Grab a server from the pool. + // None = any shard + let connection = pool.get(None).await.unwrap(); + let mut proxy = connection.0; + let _address = connection.1; let server = &mut *proxy; - // TODO: maybe don't do this, I don't think it's useful. - server.set_name(&self.name).await?; - // Claim this server as mine for query cancellation. server.claim(self.process_id, self.secret_key); loop { - let mut message = match self.buffer.len() { - 0 => { - match read_message(&mut self.read).await { - Ok(message) => message, - Err(err) => { - if server.in_transaction() { - // TODO: this is what PgBouncer does - // which leads to connection thrashing. - // - // I think we could issue a ROLLBACK here instead. - server.mark_bad(); - } - - return Err(err); + // No messages in the buffer, read one. + let mut message = if message.len() == 0 { + match read_message(&mut self.read).await { + Ok(message) => message, + Err(err) => { + // Client disconnected without warning. + if server.in_transaction() { + // TODO: this is what PgBouncer does + // which leads to connection thrashing. + // + // I think we could issue a ROLLBACK here instead. + // server.mark_bad(); + server.query("ROLLBACK; DISCARD ALL;").await?; } + + return Err(err); } } - - _ => { - let message = self.buffer.clone(); - self.buffer.clear(); - message - } + } else { + let msg = message.clone(); + message.clear(); + msg }; let original = message.clone(); // To be forwarded to the server @@ -222,9 +234,13 @@ impl Client { } 'X' => { - // Client closing + // Client closing. Rollback and clean up + // connection before releasing into the pool. + // Pgbouncer closes the connection which leads to + // connection thrashing when clients misbehave. + // This pool will protect the database. :salute: if server.in_transaction() { - server.query("ROLLBACK").await?; + server.query("ROLLBACK; DISCARD ALL;").await?; } return Ok(()); @@ -239,6 +255,7 @@ impl Client { self.buffer.put(&original[..]); } + // Describe 'D' => { self.buffer.put(&original[..]); } diff --git a/src/main.rs b/src/main.rs index e2e4dd5..ba6f76b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,14 +75,6 @@ async fn main() { let database = "lev"; let pool = ConnectionPool::new(addresses, user, database, client_server_map.clone()).await; - // We are round-robining, so ideally the replicas will be equally loaded. - // Therefore, we are allocating number of replicas * pool size of connections. - // However, if a replica dies, the remaining replicas will share the burden, - // also equally. - // - // Note that failover in this case could bring down the remaining replicas, so - // in certain situations, e.g. when replicas are running hot already, failover - // is not at all desirable!! loop { let pool = pool.clone();