From 28c70d47b6b39b97eec629c4b62864d753dfdba2 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 9 Feb 2022 20:02:20 -0800 Subject: [PATCH 01/10] #1 Primary/replica selection --- pgcat.toml | 20 ++++++++--------- src/client.rs | 59 +++++++++++++++++++++++++++++++++++++++++++++---- src/config.rs | 10 ++++++++- src/messages.rs | 10 ++++++--- src/pool.rs | 28 ++++++++++++++++++++++- src/server.rs | 7 +++++- 6 files changed, 114 insertions(+), 20 deletions(-) diff --git a/pgcat.toml b/pgcat.toml index 78f5570..803a342 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -43,26 +43,26 @@ password = "sharding_user" # Shard 0 [shards.0] -# [ host, port ] +# [ host, port, role ] servers = [ - [ "127.0.0.1", 5432 ], - [ "localhost", 5432 ], + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5432, "replica" ], ] # Database name (e.g. "postgres") database = "shard0" [shards.1] -# [ host, port ] +# [ host, port, role ] servers = [ - [ "127.0.0.1", 5432 ], - [ "localhost", 5432 ], + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5432, "replica" ], ] database = "shard1" [shards.2] -# [ host, port ] +# [ host, port, role ] servers = [ - [ "127.0.0.1", 5432 ], - [ "localhost", 5432 ], + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5432, "replica" ], ] -database = "shard2" \ No newline at end of file +database = "shard2" diff --git a/src/client.rs b/src/client.rs index 85cfbe9..5df6077 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,6 +7,7 @@ use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; +use crate::config::Role; use crate::errors::Error; use crate::messages::*; use crate::pool::{ClientServerMap, ConnectionPool}; @@ -14,6 +15,7 @@ use crate::server::Server; use crate::sharding::Sharder; const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';"; +const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)';"; /// The client state. One of these is created per client. pub struct Client { @@ -45,6 +47,9 @@ pub struct Client { // sharding regex sharding_regex: Regex, + + // role detection regex + role_regex: Regex, } impl Client { @@ -57,6 +62,7 @@ impl Client { transaction_mode: bool, ) -> Result { let sharding_regex = Regex::new(SHARDING_REGEX).unwrap(); + let role_regex = Regex::new(ROLE_REGEX).unwrap(); loop { // Could be StartupMessage or SSLRequest @@ -114,6 +120,7 @@ impl Client { secret_key: secret_key, client_server_map: client_server_map, sharding_regex: sharding_regex, + role_regex: role_regex, }); } @@ -134,6 +141,7 @@ impl Client { secret_key: secret_key, client_server_map: client_server_map, sharding_regex: sharding_regex, + role_regex: role_regex, }); } @@ -172,6 +180,8 @@ impl Client { // - if in transaction mode, this lives for the duration of one transaction. let mut shard: Option = None; + let mut role: Option = None; + loop { // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). @@ -182,18 +192,29 @@ impl Client { // Parse for special select shard command. // SET SHARDING KEY TO 'bigint'; - match self.select_shard(message.clone(), pool.shards()).await { + match self.select_shard(message.clone(), pool.shards()) { Some(s) => { - set_sharding_key(&mut self.write).await?; + custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?; shard = Some(s); continue; } None => (), }; + // Parse for special server role selection command. + // + match self.select_role(message.clone()) { + Some(r) => { + custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; + role = Some(r); + continue; + } + None => (), + }; + // Grab a server from the pool. // None = any shard - let connection = pool.get(shard).await.unwrap(); + let connection = pool.get(shard, role).await.unwrap(); let mut proxy = connection.0; let _address = connection.1; let server = &mut *proxy; @@ -252,6 +273,7 @@ impl Client { // Release server if !server.in_transaction() && self.transaction_mode { shard = None; + role = None; break; } } @@ -311,6 +333,7 @@ impl Client { // Release server if !server.in_transaction() && self.transaction_mode { shard = None; + role = None; break; } } @@ -338,6 +361,7 @@ impl Client { if !server.in_transaction() && self.transaction_mode { println!("Releasing after copy done"); shard = None; + role = None; break; } } @@ -361,7 +385,7 @@ impl Client { /// Determine if the query is part of our special syntax, extract /// the shard key, and return the shard to query based on Postgres' /// PARTITION BY HASH function. - async fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option { + fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option { let code = buf.get_u8() as char; // Only supporting simpe protocol here, so @@ -390,4 +414,31 @@ impl Client { None } } + + // Pick a primary or a replica from the pool. + fn select_role(&mut self, mut buf: BytesMut) -> Option { + let code = buf.get_u8() as char; + + // Same story as select_shard() above. + match code { + 'Q' => (), + _ => return None, + }; + + let len = buf.get_i32(); + let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase(); + + // Copy / paste from above. If we get one more of these use cases, + // it'll be time to abstract :). + if self.role_regex.is_match(&query) { + let role = query.split("'").collect::>()[1]; + match role { + "PRIMARY" => Some(Role::Primary), + "REPLICA" => Some(Role::Replica), + _ => return None, + } + } else { + None + } + } } diff --git a/src/config.rs b/src/config.rs index 39218f7..094fd79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,10 +7,17 @@ use std::collections::HashMap; use crate::errors::Error; +#[derive(Clone, PartialEq, Deserialize, Hash, std::cmp::Eq, Debug, Copy)] +pub enum Role { + Primary, + Replica, +} + #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)] pub struct Address { pub host: String, pub port: String, + pub role: Role, } #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)] @@ -32,7 +39,7 @@ pub struct General { #[derive(Deserialize, Debug, Clone)] pub struct Shard { - pub servers: Vec<(String, u16)>, + pub servers: Vec<(String, u16, String)>, pub database: String, } @@ -83,5 +90,6 @@ mod test { assert_eq!(config.general.pool_size, 15); assert_eq!(config.shards.len(), 3); assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1"); + assert_eq!(config.shards["0"].servers[0].2, "primary"); } } diff --git a/src/messages.rs b/src/messages.rs index 90a6700..5f17d8d 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -141,12 +141,16 @@ pub async fn md5_password( Ok(write_all(stream, message).await?) } -/// Implements a response to our custom `SET SHARDING KEY` command. +/// Implements a response to our custom `SET SHARDING KEY` +/// and `SET SERVER ROLE` commands. /// This tells the client we're ready for the next query. -pub async fn set_sharding_key(stream: &mut OwnedWriteHalf) -> Result<(), Error> { +pub async fn custom_protocol_response_ok( + stream: &mut OwnedWriteHalf, + message: &str, +) -> Result<(), Error> { let mut res = BytesMut::with_capacity(25); - let set_complete = BytesMut::from(&"SET SHARDING KEY\0"[..]); + let set_complete = BytesMut::from(&format!("{}\0", message)[..]); let len = (set_complete.len() + 4) as i32; // CommandComplete diff --git a/src/pool.rs b/src/pool.rs index 624818a..57bc066 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use bb8::{ManageConnection, Pool, PooledConnection}; use chrono::naive::NaiveDateTime; -use crate::config::{Address, Config, User}; +use crate::config::{Address, Config, Role, User}; use crate::errors::Error; use crate::server::Server; @@ -48,9 +48,19 @@ impl ConnectionPool { let mut replica_addresses = Vec::new(); for server in &shard.servers { + let role = match server.2.as_ref() { + "primary" => Role::Primary, + "replica" => Role::Replica, + _ => { + println!("> Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2); + Role::Replica + } + }; + let address = Address { host: server.0.clone(), port: server.1.to_string(), + role: role, }; let manager = ServerPool::new( @@ -93,6 +103,7 @@ impl ConnectionPool { pub async fn get( &self, shard: Option, + role: Option, ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { // Set this to false to gain ~3-4% speed. let with_health_check = true; @@ -103,6 +114,9 @@ impl ConnectionPool { }; loop { + // TODO: think about making this local, so multiple clients + // don't compete for the same round-robin integer. + // Especially since we're going to be skipping (see role selection below). let index = self.round_robin.fetch_add(1, Ordering::SeqCst) % self.databases[shard].len(); let address = self.addresses[shard][index].clone(); @@ -111,6 +125,17 @@ impl ConnectionPool { continue; } + // Make sure you're getting a primary or a replica + // as per request. + match role { + Some(role) => { + if address.role != role { + continue; + } + } + None => (), + }; + // Check if we can connect // TODO: implement query wait timeout, i.e. time to get a conn from the pool let mut conn = match self.databases[shard][index].get().await { @@ -251,6 +276,7 @@ impl ManageConnection for ServerPool { &self.user.password, &self.database, self.client_server_map.clone(), + self.address.role, ) .await } diff --git a/src/server.rs b/src/server.rs index 0a7e31f..13f362a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,7 +8,7 @@ use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; -use crate::config::Address; +use crate::config::{Address, Role}; use crate::errors::Error; use crate::messages::*; use crate::ClientServerMap; @@ -48,6 +48,8 @@ pub struct Server { // Mapping of clients and servers used for query cancellation. client_server_map: ClientServerMap, + + role: Role, } impl Server { @@ -60,6 +62,7 @@ impl Server { password: &str, database: &str, client_server_map: ClientServerMap, + role: Role, ) -> Result { let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await { Ok(stream) => stream, @@ -189,6 +192,7 @@ impl Server { data_available: false, bad: false, client_server_map: client_server_map, + role: role, }); } @@ -409,6 +413,7 @@ impl Server { Address { host: self.host.to_string(), port: self.port.to_string(), + role: self.role, } } } From a9b2a41a9bf2d2bab35c2ea175656d6176a71822 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 9 Feb 2022 21:19:14 -0800 Subject: [PATCH 02/10] fixes to the banlist --- pgcat.toml | 3 ++ src/client.rs | 15 ++++++- src/config.rs | 17 +++++++- src/errors.rs | 1 + src/pool.rs | 39 +++++++++++++++---- tests/sharding/query_routing.sh | 9 ++++- .../query_routing_test_primary_replica.sql | 13 +++++++ 7 files changed, 87 insertions(+), 10 deletions(-) create mode 100644 tests/sharding/query_routing_test_primary_replica.sql diff --git a/pgcat.toml b/pgcat.toml index 803a342..ffcf722 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -47,6 +47,7 @@ password = "sharding_user" servers = [ [ "127.0.0.1", 5432, "primary" ], [ "localhost", 5432, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], ] # Database name (e.g. "postgres") database = "shard0" @@ -56,6 +57,7 @@ database = "shard0" servers = [ [ "127.0.0.1", 5432, "primary" ], [ "localhost", 5432, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], ] database = "shard1" @@ -64,5 +66,6 @@ database = "shard1" servers = [ [ "127.0.0.1", 5432, "primary" ], [ "localhost", 5432, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], ] database = "shard2" diff --git a/src/client.rs b/src/client.rs index 5df6077..7d7e27a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -214,7 +214,14 @@ impl Client { // Grab a server from the pool. // None = any shard - let connection = pool.get(shard, role).await.unwrap(); + let connection = match pool.get(shard, role).await { + Ok(conn) => conn, + Err(err) => { + println!(">> Could not get connection from pool: {:?}", err); + return Err(err); + } + }; + let mut proxy = connection.0; let _address = connection.1; let server = &mut *proxy; @@ -253,10 +260,13 @@ impl Client { match code { 'Q' => { + // TODO: implement retries here for read-only transactions. server.send(original).await?; loop { + // TODO: implement retries here for read-only transactions. let response = server.recv().await?; + match write_all_half(&mut self.write, response).await { Ok(_) => (), Err(err) => { @@ -312,10 +322,13 @@ impl Client { 'S' => { // Extended protocol, client requests sync self.buffer.put(&original[..]); + + // TODO: retries for read-only transactions server.send(self.buffer.clone()).await?; self.buffer.clear(); loop { + // TODO: retries for read-only transactions let response = server.recv().await?; match write_all_half(&mut self.write, response).await { Ok(_) => (), diff --git a/src/config.rs b/src/config.rs index 094fd79..fe9206d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use toml; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use crate::errors::Error; @@ -77,6 +77,21 @@ pub async fn parse(path: &str) -> Result { } }; + // We use addresses as unique identifiers, + // let's make sure they are unique in the config as well. + for shard in &config.shards { + let mut dup_check = HashSet::new(); + + for server in &shard.1.servers { + dup_check.insert(server); + } + + if dup_check.len() != shard.1.servers.len() { + println!("> Shard {} contains duplicate server configs.", &shard.0); + return Err(Error::BadConfig); + } + } + Ok(config) } diff --git a/src/errors.rs b/src/errors.rs index 3dcbf74..1fc26bb 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -8,4 +8,5 @@ pub enum Error { // ServerTimeout, // DirtyServer, BadConfig, + AllServersDown, } diff --git a/src/pool.rs b/src/pool.rs index 57bc066..49e13e5 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -113,7 +113,17 @@ impl ConnectionPool { None => 0, // TODO: pick a shard at random }; - loop { + let mut allowed_attempts = match role { + // Primary-specific queries get one attempt, if the primary is down, + // nothing we can do. + Some(Role::Primary) => 1, + + // Replicas get to try as many times as there are replicas. + Some(Role::Replica) => self.databases[shard].len(), + None => self.databases[shard].len(), + }; + + while allowed_attempts > 0 { // TODO: think about making this local, so multiple clients // don't compete for the same round-robin integer. // Especially since we're going to be skipping (see role selection below). @@ -121,21 +131,27 @@ impl ConnectionPool { self.round_robin.fetch_add(1, Ordering::SeqCst) % self.databases[shard].len(); let address = self.addresses[shard][index].clone(); - if self.is_banned(&address, shard) { - continue; - } - // Make sure you're getting a primary or a replica // as per request. match role { Some(role) => { - if address.role != role { + // If the client wants a specific role, + // we'll do our best to pick it, but if we only + // have one server in the cluster, it's probably only a primary + // (or only a replica), so the client will just get what we have. + if address.role != role && self.addresses[shard].len() > 1 { continue; } } None => (), }; + if self.is_banned(&address, shard, role) { + continue; + } + + allowed_attempts -= 1; + // Check if we can connect // TODO: implement query wait timeout, i.e. time to get a conn from the pool let mut conn = match self.databases[shard][index].get().await { @@ -183,6 +199,8 @@ impl ConnectionPool { } } } + + return Err(Error::AllServersDown); } /// Ban an address (i.e. replica). It no longer will serve @@ -204,7 +222,14 @@ impl ConnectionPool { /// Check if a replica can serve traffic. If all replicas are banned, /// we unban all of them. Better to try then not to. - pub fn is_banned(&self, address: &Address, shard: usize) -> bool { + pub fn is_banned(&self, address: &Address, shard: usize, role: Option) -> bool { + // If primary is requested explicitely, it can never be banned. + if Some(Role::Primary) == role { + return false; + } + + // If you're not asking for the primary, + // all databases are treated as replicas. let mut guard = self.banlist.lock().unwrap(); // Everything is banned = nothing is banned. diff --git a/tests/sharding/query_routing.sh b/tests/sharding/query_routing.sh index d1b2b84..78aaa60 100644 --- a/tests/sharding/query_routing.sh +++ b/tests/sharding/query_routing.sh @@ -1,7 +1,12 @@ #/bin/bash +set -e # Setup all the shards. -sudo service postgresql restart +# sudo service postgresql restart + +echo "Giving Postgres 5 seconds to start up..." + +# sleep 5 psql -f query_routing_setup.sql @@ -9,4 +14,6 @@ psql -h 127.0.0.1 -p 6432 -f query_routing_test_insert.sql psql -h 127.0.0.1 -p 6432 -f query_routing_test_select.sql +psql -e -h 127.0.0.1 -p 6432 -f query_routing_test_primary_replica.sql + psql -f query_routing_test_validate.sql \ No newline at end of file diff --git a/tests/sharding/query_routing_test_primary_replica.sql b/tests/sharding/query_routing_test_primary_replica.sql new file mode 100644 index 0000000..06a734c --- /dev/null +++ b/tests/sharding/query_routing_test_primary_replica.sql @@ -0,0 +1,13 @@ +SET SERVER ROLE TO 'primary'; +SELECT 1; + +SET SERVER ROLE TO 'replica'; +SELECT 1; + +SET SHARDING KEY TO '1234'; +SET SERVER ROLE TO 'primary'; +SELECT 1; + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '4321'; +SELECT 1; \ No newline at end of file From fccfb402583defdb2853c91753863052d1f48f9c Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 9 Feb 2022 21:20:20 -0800 Subject: [PATCH 03/10] nl --- tests/sharding/query_routing.sh | 2 +- tests/sharding/query_routing_test_primary_replica.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sharding/query_routing.sh b/tests/sharding/query_routing.sh index 78aaa60..acc8532 100644 --- a/tests/sharding/query_routing.sh +++ b/tests/sharding/query_routing.sh @@ -16,4 +16,4 @@ psql -h 127.0.0.1 -p 6432 -f query_routing_test_select.sql psql -e -h 127.0.0.1 -p 6432 -f query_routing_test_primary_replica.sql -psql -f query_routing_test_validate.sql \ No newline at end of file +psql -f query_routing_test_validate.sql diff --git a/tests/sharding/query_routing_test_primary_replica.sql b/tests/sharding/query_routing_test_primary_replica.sql index 06a734c..0188dbe 100644 --- a/tests/sharding/query_routing_test_primary_replica.sql +++ b/tests/sharding/query_routing_test_primary_replica.sql @@ -10,4 +10,4 @@ SELECT 1; SET SERVER ROLE TO 'replica'; SET SHARDING KEY TO '4321'; -SELECT 1; \ No newline at end of file +SELECT 1; From 6d5ab79ed3eae394fed43bb4f89207d12567dcdc Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 9 Feb 2022 21:25:17 -0800 Subject: [PATCH 04/10] readme --- README.md | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 25e213d..d91ddd6 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,9 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing. 3. `COPY` protocol support. 4. Query cancellation. 5. Round-robin load balancing of replicas. -6. Banlist & failover +6. Banlist & failover. 7. Sharding! +8. Explicit query routing to primary or replicas. ### Session mode Each client owns its own server for the duration of the session. Commands like `SET` are allowed. @@ -56,7 +57,8 @@ this might be relevant given than this is a transactional pooler but if you're n ### Round-robin load balancing This is the novel part. PgBouncer doesn't support it and suggests we use DNS or a TCP proxy instead. We prefer to have everything as part of one package; arguably, it's easier to understand and optimize. -This pooler will round-robin between multiple replicas keeping load reasonably even. +This pooler will round-robin between multiple replicas keeping load reasonably even. If the primary is in +the pool as well, it'll be treated as a replica for read-only queries. ### Banlist & failover This is where it gets even more interesting. If we fail to connect to one of the replicas or it fails a health check, @@ -82,6 +84,19 @@ SET SHARDING KEY TO '1234'; This sharding key will be hashed and the pooler will select a shard to use for the next transaction. If the pooler is in session mode, this sharding key has to be set as the first query on startup & cannot be changed until the client re-connects. +### Explicit read/write query routing + +If you want to have the primary and replicas in the same pooler, you'd probably want to +route queries explicitely to the primary or replicas, depending if they are reads or writes (e.g `SELECT`s or `INSERT`/`UPDATE`, etc). To help with this, we introduce some more custom syntax: + +```sql +SET SERVER ROLE TO 'primary'; +SET SERVER ROLE TO 'replica'; +``` + +After executing this, the next transaction will be routed to the primary or replica respectively. By default, all queries will be load-balanced between all servers, so if the client wants to write or talk to the primary, they have to explicitely select it using the syntax above. + + ## Missing From daf120aeac735325f737f06ba3de4c383e291562 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 08:35:25 -0800 Subject: [PATCH 05/10] more tests --- src/client.rs | 1 + .../query_routing_test_primary_replica.sql | 148 +++++++++++++++++- 2 files changed, 141 insertions(+), 8 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7d7e27a..d9bd074 100644 --- a/src/client.rs +++ b/src/client.rs @@ -180,6 +180,7 @@ impl Client { // - if in transaction mode, this lives for the duration of one transaction. let mut shard: Option = None; + // Active database role we want to talk to, e.g. primary or replica. let mut role: Option = None; loop { diff --git a/tests/sharding/query_routing_test_primary_replica.sql b/tests/sharding/query_routing_test_primary_replica.sql index 0188dbe..358b073 100644 --- a/tests/sharding/query_routing_test_primary_replica.sql +++ b/tests/sharding/query_routing_test_primary_replica.sql @@ -1,13 +1,145 @@ +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '1'; +INSERT INTO data (id, value) VALUES (1, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '1'; +SELECT * FROM data WHERE id = 1; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '2'; +INSERT INTO data (id, value) VALUES (2, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '2'; +SELECT * FROM data WHERE id = 2; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '3'; +INSERT INTO data (id, value) VALUES (3, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '3'; +SELECT * FROM data WHERE id = 3; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '4'; +INSERT INTO data (id, value) VALUES (4, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '4'; +SELECT * FROM data WHERE id = 4; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '5'; +INSERT INTO data (id, value) VALUES (5, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '5'; +SELECT * FROM data WHERE id = 5; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '6'; +INSERT INTO data (id, value) VALUES (6, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '6'; +SELECT * FROM data WHERE id = 6; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '7'; +INSERT INTO data (id, value) VALUES (7, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '7'; +SELECT * FROM data WHERE id = 7; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '8'; +INSERT INTO data (id, value) VALUES (8, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '8'; +SELECT * FROM data WHERE id = 8; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '9'; +INSERT INTO data (id, value) VALUES (9, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '9'; +SELECT * FROM data WHERE id = 9; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '10'; +INSERT INTO data (id, value) VALUES (10, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '10'; +SELECT * FROM data WHERE id = 10; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '11'; +INSERT INTO data (id, value) VALUES (11, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '11'; +SELECT * FROM data WHERE id = 11; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '12'; +INSERT INTO data (id, value) VALUES (12, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '12'; +SELECT * FROM data WHERE id = 12; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '13'; +INSERT INTO data (id, value) VALUES (13, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '13'; +SELECT * FROM data WHERE id = 13; + +--- + +SET SERVER ROLE TO 'primary'; +SET SHARDING KEY TO '14'; +INSERT INTO data (id, value) VALUES (14, 'value_1'); + +SET SERVER ROLE TO 'replica'; +SET SHARDING KEY TO '14'; +SELECT * FROM data WHERE id = 14; + +--- + SET SERVER ROLE TO 'primary'; SELECT 1; SET SERVER ROLE TO 'replica'; SELECT 1; - -SET SHARDING KEY TO '1234'; -SET SERVER ROLE TO 'primary'; -SELECT 1; - -SET SERVER ROLE TO 'replica'; -SET SHARDING KEY TO '4321'; -SELECT 1; From 8209633e05e0abb5126b69b0892d1c6117636085 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 08:54:06 -0800 Subject: [PATCH 06/10] pool fixes --- src/main.rs | 1 + src/pool.rs | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9b084ec..3673e0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -73,6 +73,7 @@ async fn main() { "> Healthcheck timeout: {}ms", config.general.healthcheck_timeout ); + println!("> Connection timeout: {}ms", config.general.connect_timeout); let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await; let transaction_mode = config.general.pool_mode == "transaction"; diff --git a/src/pool.rs b/src/pool.rs index 49e13e5..8cf3ada 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -26,6 +26,7 @@ pub struct ConnectionPool { banlist: BanList, healthcheck_timeout: u64, ban_time: i64, + pool_size: u32, } impl ConnectionPool { @@ -96,6 +97,7 @@ impl ConnectionPool { banlist: Arc::new(Mutex::new(banlist)), healthcheck_timeout: config.general.healthcheck_timeout, ban_time: config.general.ban_time, + pool_size: config.general.pool_size, } } @@ -115,12 +117,29 @@ impl ConnectionPool { let mut allowed_attempts = match role { // Primary-specific queries get one attempt, if the primary is down, - // nothing we can do. - Some(Role::Primary) => 1, + // nothing we should do about it I think. It's dangerous to retry + // write queries. + Some(Role::Primary) => { + // Make sure we have a primary in the pool configured. + let primary_present = self.addresses[shard] + .iter() + .filter(|&db| db.role == Role::Primary) + .count(); - // Replicas get to try as many times as there are replicas. - Some(Role::Replica) => self.databases[shard].len(), - None => self.databases[shard].len(), + // TODO: return this error to the client, so people don't have to look in + // the logs to figure out what happened. + if primary_present == 0 { + println!(">> Error: Primary requested but none are configured."); + return Err(Error::AllServersDown); + } + + // Primary gets one attempt. + 1 + } + + // Replicas get to try as many times as there are replicas + // and connections in the pool. + _ => self.databases[shard].len() * self.pool_size as usize, }; while allowed_attempts > 0 { @@ -184,6 +203,9 @@ impl ConnectionPool { ">> Banning replica {} because of failed health check", index ); + // Don't leave a bad connection in the pool. + server.mark_bad(); + self.ban(&address, shard); continue; } @@ -194,6 +216,9 @@ impl ConnectionPool { ">> Banning replica {} because of health check timeout", index ); + // Don't leave a bad connection in the pool. + server.mark_bad(); + self.ban(&address, shard); continue; } From c1476d29da5c4343a62ebfa2cebc48e8212d2993 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 09:07:10 -0800 Subject: [PATCH 07/10] config tests --- src/config.rs | 30 ++++++++++++++++++++++++++++-- tests/sharding/query_routing.sh | 2 +- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index fe9206d..b517c78 100644 --- a/src/config.rs +++ b/src/config.rs @@ -77,13 +77,39 @@ pub async fn parse(path: &str) -> Result { } }; - // We use addresses as unique identifiers, - // let's make sure they are unique in the config as well. + // Quick config sanity check. for shard in &config.shards { + // We use addresses as unique identifiers, + // let's make sure they are unique in the config as well. let mut dup_check = HashSet::new(); + let mut primary_count = 0; for server in &shard.1.servers { dup_check.insert(server); + + // Check that we define only zero or one primary. + match server.2.as_ref() { + "primary" => primary_count += 1, + _ => (), + }; + + // Check role spelling. + match server.2.as_ref() { + "primary" => (), + "replica" => (), + _ => { + println!( + "> Shard {} server role must be either 'primary' or 'replica', got: '{}'", + shard.0, server.2 + ); + return Err(Error::BadConfig); + } + }; + } + + if primary_count > 1 { + println!("> Shard {} has more than on primary configured.", &shard.0); + return Err(Error::BadConfig); } if dup_check.len() != shard.1.servers.len() { diff --git a/tests/sharding/query_routing.sh b/tests/sharding/query_routing.sh index acc8532..d6098fa 100644 --- a/tests/sharding/query_routing.sh +++ b/tests/sharding/query_routing.sh @@ -8,7 +8,7 @@ echo "Giving Postgres 5 seconds to start up..." # sleep 5 -psql -f query_routing_setup.sql +# psql -f query_routing_setup.sql psql -h 127.0.0.1 -p 6432 -f query_routing_test_insert.sql From 22c6f13dc7b9488b4329b80ee927fd6115f82428 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 10:37:49 -0800 Subject: [PATCH 08/10] removed atomic round-robin --- src/client.rs | 2 +- src/pool.rs | 75 +++++++++++++++++++++++++++++---------------------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/src/client.rs b/src/client.rs index d9bd074..b397692 100644 --- a/src/client.rs +++ b/src/client.rs @@ -153,7 +153,7 @@ impl Client { } /// Client loop. We handle all messages between the client and the database here. - pub async fn handle(&mut self, pool: ConnectionPool) -> Result<(), Error> { + pub async fn handle(&mut self, mut pool: ConnectionPool) -> Result<(), Error> { // Special: cancelling existing running query if self.cancel_mode { let (process_id, secret_key, address, port) = { diff --git a/src/pool.rs b/src/pool.rs index 8cf3ada..0b9b62b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -9,20 +9,20 @@ use crate::server::Server; use std::collections::HashMap; use std::sync::{ - atomic::{AtomicUsize, Ordering}, + // atomic::{AtomicUsize, Ordering}, Arc, Mutex, }; // Banlist: bad servers go in here. pub type BanList = Arc>>>; -pub type Counter = Arc; +// pub type Counter = Arc; pub type ClientServerMap = Arc>>; #[derive(Clone, Debug)] pub struct ConnectionPool { databases: Vec>>, addresses: Vec>, - round_robin: Counter, + round_robin: usize, banlist: BanList, healthcheck_timeout: u64, ban_time: i64, @@ -90,10 +90,13 @@ impl ConnectionPool { banlist.push(HashMap::new()); } + assert_eq!(shards.len(), addresses.len()); + let address_len = addresses.len(); + ConnectionPool { databases: shards, addresses: addresses, - round_robin: Arc::new(AtomicUsize::new(0)), + round_robin: rand::random::() % address_len, // Start at a random replica banlist: Arc::new(Mutex::new(banlist)), healthcheck_timeout: config.general.healthcheck_timeout, ban_time: config.general.ban_time, @@ -103,7 +106,7 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( - &self, + &mut self, shard: Option, role: Option, ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { @@ -115,40 +118,48 @@ impl ConnectionPool { None => 0, // TODO: pick a shard at random }; - let mut allowed_attempts = match role { - // Primary-specific queries get one attempt, if the primary is down, - // nothing we should do about it I think. It's dangerous to retry - // write queries. - Some(Role::Primary) => { - // Make sure we have a primary in the pool configured. - let primary_present = self.addresses[shard] + let addresses = &self.addresses[shard]; + + // Make sure if a specific role is requested, it's available in the pool. + match role { + Some(role) => { + let role_count = addresses .iter() .filter(|&db| db.role == Role::Primary) .count(); - // TODO: return this error to the client, so people don't have to look in - // the logs to figure out what happened. - if primary_present == 0 { - println!(">> Error: Primary requested but none are configured."); + if role_count == 0 { + println!( + ">> Error: Role '{:?}' requested, but none are configured.", + role + ); + return Err(Error::AllServersDown); } - - // Primary gets one attempt. - 1 } + // Any role should be present. + _ => (), + }; + + let mut allowed_attempts = match role { + // Primary-specific queries get one attempt, if the primary is down, + // nothing we should do about it I think. It's dangerous to retry + // write queries. + Some(Role::Primary) => 1, + // Replicas get to try as many times as there are replicas // and connections in the pool. _ => self.databases[shard].len() * self.pool_size as usize, }; while allowed_attempts > 0 { - // TODO: think about making this local, so multiple clients - // don't compete for the same round-robin integer. - // Especially since we're going to be skipping (see role selection below). - let index = - self.round_robin.fetch_add(1, Ordering::SeqCst) % self.databases[shard].len(); - let address = self.addresses[shard][index].clone(); + // Round-robin each client's queries. + // If a client only sends one query and then disconnects, it doesn't matter + // which replica it'll go to. + self.round_robin += 1; + let index = self.round_robin % addresses.len(); + let address = &addresses[index]; // Make sure you're getting a primary or a replica // as per request. @@ -158,14 +169,14 @@ impl ConnectionPool { // we'll do our best to pick it, but if we only // have one server in the cluster, it's probably only a primary // (or only a replica), so the client will just get what we have. - if address.role != role && self.addresses[shard].len() > 1 { + if address.role != role && addresses.len() > 1 { continue; } } None => (), }; - if self.is_banned(&address, shard, role) { + if self.is_banned(address, shard, role) { continue; } @@ -177,13 +188,13 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { println!(">> Banning replica {}, error: {:?}", index, err); - self.ban(&address, shard); + self.ban(address, shard); continue; } }; if !with_health_check { - return Ok((conn, address)); + return Ok((conn, address.clone())); } // // Check if this server is alive with a health check @@ -197,7 +208,7 @@ impl ConnectionPool { { // Check if health check succeeded Ok(res) => match res { - Ok(_) => return Ok((conn, address)), + Ok(_) => return Ok((conn, address.clone())), Err(_) => { println!( ">> Banning replica {} because of failed health check", @@ -206,7 +217,7 @@ impl ConnectionPool { // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(&address, shard); + self.ban(address, shard); continue; } }, @@ -219,7 +230,7 @@ impl ConnectionPool { // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(&address, shard); + self.ban(address, shard); continue; } } From 883b6ee79384e46e2ae0307d03920db7e0b86c9d Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 10:37:55 -0800 Subject: [PATCH 09/10] todo complete --- src/pool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pool.rs b/src/pool.rs index 0b9b62b..9905e36 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -183,7 +183,6 @@ impl ConnectionPool { allowed_attempts -= 1; // Check if we can connect - // TODO: implement query wait timeout, i.e. time to get a conn from the pool let mut conn = match self.databases[shard][index].get().await { Ok(conn) => conn, Err(err) => { From 495d6ce6c32de9030477398edcb73e3f9de3f0ea Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 10 Feb 2022 10:38:06 -0800 Subject: [PATCH 10/10] fmt --- src/pool.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pool.rs b/src/pool.rs index 9905e36..cbe4d1a 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -10,7 +10,8 @@ use crate::server::Server; use std::collections::HashMap; use std::sync::{ // atomic::{AtomicUsize, Ordering}, - Arc, Mutex, + Arc, + Mutex, }; // Banlist: bad servers go in here.