diff --git a/pgcat.toml b/pgcat.toml index ffcf722..44bcd00 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -69,3 +69,15 @@ servers = [ # [ "127.0.1.1", 5432, "replica" ], ] database = "shard2" + + +# Settings for our query routing layer. +[query_router] + +# If the client doesn't specify, route traffic to +# this role by default. +# +# any: round-robin between primary and replicas, +# replica: round-robin between replicas only without touching the primary, +# primary: all queries go to the primary unless otherwise specified. +default_role = "any" \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 1460309..a9e7a60 100644 --- a/src/client.rs +++ b/src/client.rs @@ -48,6 +48,10 @@ pub struct Client { // Clients are mapped to servers while they use them. This allows a client // to connect and cancel a query. client_server_map: ClientServerMap, + + // Unless client specifies, route queries to the servers that have this role, + // e.g. primary or replicas or any. + default_server_role: Option, } impl Client { @@ -58,6 +62,7 @@ impl Client { mut stream: TcpStream, client_server_map: ClientServerMap, transaction_mode: bool, + default_server_role: Option, ) -> Result { loop { // Could be StartupMessage or SSLRequest @@ -114,6 +119,7 @@ impl Client { process_id: process_id, secret_key: secret_key, client_server_map: client_server_map, + default_server_role: default_server_role, }); } @@ -133,6 +139,7 @@ impl Client { process_id: process_id, secret_key: secret_key, client_server_map: client_server_map, + default_server_role: default_server_role, }); } @@ -172,7 +179,7 @@ impl Client { let mut shard: Option = None; // Active database role we want to talk to, e.g. primary or replica. - let mut role: Option = None; + let mut role: Option = self.default_server_role; loop { // Read a complete message from the client, which normally would be @@ -275,7 +282,7 @@ impl Client { // Release server if !server.in_transaction() && self.transaction_mode { shard = None; - role = None; + role = self.default_server_role; break; } } @@ -338,7 +345,7 @@ impl Client { // Release server if !server.in_transaction() && self.transaction_mode { shard = None; - role = None; + role = self.default_server_role; break; } } @@ -366,7 +373,7 @@ impl Client { if !server.in_transaction() && self.transaction_mode { println!("Releasing after copy done"); shard = None; - role = None; + role = self.default_server_role; break; } } @@ -382,7 +389,7 @@ impl Client { } /// Release the server from being mine. I can't cancel its queries anymore. - pub fn release(&mut self) { + pub fn release(&self) { let mut guard = self.client_server_map.lock().unwrap(); guard.remove(&(self.process_id, self.secret_key)); } @@ -390,7 +397,7 @@ impl Client { /// Determine if the query is part of our special syntax, extract /// the shard key, and return the shard to query based on Postgres' /// PARTITION BY HASH function. - fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option { + fn select_shard(&self, mut buf: BytesMut, shards: usize) -> Option { let code = buf.get_u8() as char; // Only supporting simpe protocol here, so @@ -425,7 +432,7 @@ impl Client { } // Pick a primary or a replica from the pool. - fn select_role(&mut self, mut buf: BytesMut) -> Option { + fn select_role(&self, mut buf: BytesMut) -> Option { let code = buf.get_u8() as char; // Same story as select_shard() above. diff --git a/src/config.rs b/src/config.rs index b517c78..79c2c0b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,11 +43,17 @@ pub struct Shard { pub database: String, } +#[derive(Deserialize, Debug, Clone)] +pub struct QueryRouter { + pub default_role: String, +} + #[derive(Deserialize, Debug, Clone)] pub struct Config { pub general: General, pub user: User, pub shards: HashMap, + pub query_router: QueryRouter, } /// Parse the config. @@ -118,6 +124,19 @@ pub async fn parse(path: &str) -> Result { } } + match config.query_router.default_role.as_ref() { + "any" => (), + "primary" => (), + "replica" => (), + other => { + println!( + "> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'", + other + ); + return Err(Error::BadConfig); + } + }; + Ok(config) } @@ -132,5 +151,6 @@ mod test { assert_eq!(config.shards.len(), 3); assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1"); assert_eq!(config.shards["0"].servers[0].2, "primary"); + assert_eq!(config.query_router.default_role, "any"); } } diff --git a/src/main.rs b/src/main.rs index 8ba9cac..05903d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,6 +40,7 @@ mod sharding; // Support for query cancellation: this maps our process_ids and // secret keys to the backend's. +use config::Role; use pool::{ClientServerMap, ConnectionPool}; /// Main! @@ -87,6 +88,15 @@ async fn main() { let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await; let transaction_mode = config.general.pool_mode == "transaction"; + let default_server_role = match config.query_router.default_role.as_ref() { + "any" => None, + "primary" => Some(Role::Primary), + "replica" => Some(Role::Replica), + _ => { + println!("> Config error, got unexpected query_router.default_role."); + return; + } + }; println!("> Waiting for clients..."); @@ -109,7 +119,14 @@ async fn main() { addr, transaction_mode ); - match client::Client::startup(socket, client_server_map, transaction_mode).await { + match client::Client::startup( + socket, + client_server_map, + transaction_mode, + default_server_role, + ) + .await + { Ok(mut client) => { println!(">> Client {:?} authenticated successfully!", addr); diff --git a/src/pool.rs b/src/pool.rs index cbe4d1a..8c92bc6 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -124,10 +124,7 @@ impl ConnectionPool { // Make sure if a specific role is requested, it's available in the pool. match role { Some(role) => { - let role_count = addresses - .iter() - .filter(|&db| db.role == Role::Primary) - .count(); + let role_count = addresses.iter().filter(|&db| db.role == role).count(); if role_count == 0 { println!(