From a556ec1c4388891a37ed8a89d674b99977267bf6 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sat, 19 Feb 2022 08:57:24 -0800 Subject: [PATCH] More query router commands; settings last until changed again; docs (#25) * readme * touch up docs * stuff * refactor query router * remove unused * less verbose * docs * no link * method rename --- README.md | 172 +++++++++++----- src/client.rs | 67 +++--- src/messages.rs | 93 +++++++++ src/query_router.rs | 490 ++++++++++++++++++++++---------------------- tests/ruby/tests.rb | 103 ++++++++-- 5 files changed, 579 insertions(+), 346 deletions(-) diff --git a/README.md b/README.md index 3f1c798..ab309c9 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,26 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su **Alpha**: don't use in production just yet. +## Features + +| **Feature** | **Status** | **Comments** | +|--------------------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| +| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. | +| Session pooling | :heavy_check_mark: | Identical to PgBouncer. | +| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | +| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. | +| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | +| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | +| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | +| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. | +| Live configuration reloading | :x: :wrench: | On the roadmap; currently config changes require restart. | +| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. | + +## Deployment + +See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance. +That setting can be adjusted to spawn as many (or as little) workers as needed. + ## Local development 1. Install Rust (latest stable will work great). @@ -18,7 +38,7 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su ### Tests -You can just PgBench to test your changes: +Quickest way to test your changes is to use pgbench: ``` pgbench -i -h 127.0.0.1 -p 6432 && \ @@ -28,80 +48,130 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended See [sharding README](./tests/sharding/README.md) for sharding logic testing. -## Features +| **Feature** | **Tested in CI** | **Tested manually** | **Comments** | +|----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------| +| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. | +| Session pooling | :x: | :heavy_check_mark: | Easiest way to test is to enable it and run pgbench - results will be better than transaction pooling as expected. | +| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. | +| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. | +| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. | +| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. | +| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | +| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | -1. Session mode. -2. Transaction mode. -3. `COPY` protocol support. -4. Query cancellation. -5. Round-robin load balancing of replicas. -6. Banlist & failover. -7. Sharding! -8. Explicit query routing to primary or replicas. + +## Usage ### Session mode -Each client owns its own server for the duration of the session. Commands like `SET` are allowed. -This is identical to PgBouncer session mode. +In session mode, a client talks to one server for the duration of the connection. Prepared statements, `SET`, and advisory locks are supported. In terms of supported features, there is very little if any difference between session mode and talking directly to the server. + +To use session mode, change `pool_mode = "session"`. ### Transaction mode -The connection is attached to the server for the duration of the transaction. `SET` will pollute the connection, -but `SET LOCAL` works great. Identical to PgBouncer transaction mode. +In transaction mode, a client talks to one server for the duration of a single transaction; once it's over, the server is returned to the pool. Prepared statements, `SET`, and advisory locks are not supported; alternatives are to use `SET LOCAL` and `pg_advisory_xact_lock` which are scoped to the transaction. -### COPY protocol -That one isn't particularly special, but good to mention that you can `COPY` data in and from the server -using this pooler. +This mode is enabled by default. -### Query cancellation -Okay, this is just basic stuff, but we support cancelling queries. If you know the Postgres protocol, -this might be relevant given than this is a transactional pooler but if you're new to Pg, don't worry about it, it works. +### Load balancing of read queries +All queries are load balanced against the configured servers using the round-robin algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries. -### Round-robin load balancing -This is the novel part. PgBouncer doesn't support it and suggests we use DNS or a TCP proxy instead. -We prefer to have everything as part of one package; arguably, it's easier to understand and optimize. -This pooler will round-robin between multiple replicas keeping load reasonably even. If the primary is in -the pool as well, it'll be treated as a replica for read-only queries. +If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary. -### Banlist & failover -This is where it gets even more interesting. If we fail to connect to one of the replicas or it fails a health check, -we add it to a ban list. No more new transactions will be served by that replica for, in our case, 60 seconds. This -gives it the opportunity to recover while clients are happily served by the remaining replicas. +The query parser is disabled by default. -This decreases error rates substantially! Worth noting here that on busy systems, if the replicas are running too hot, -failing over could bring even more load and tip over the remaining healthy-ish replicas. In this case, a decision should be made: -either lose 1/x of your traffic or risk losing it all eventually. Ideally you overprovision your system, so you don't necessarily need -to make this choice :-). - -### Sharding -We're implemeting Postgres' `PARTITION BY HASH` sharding function for `BIGINT` fields. This works well for tables that use `BIGSERIAL` primary key which I think is common enough these days. We can also add many more functions here, but this is a good start. See `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql` for more details on the implementation. - -The biggest advantage of using this sharding function is that anyone can shard the dataset using Postgres partitions -while also access it for both reads and writes using this pooler. No custom obscure sharding function is needed and database sharding can be done entirely in Postgres. - -To select the shard we want to talk to, we introduced special syntax: +#### Query parser +The query parser will do its best to determine where the query should go, but sometimes that's not possible. In that case, the client can select which server it wants using this custom SQL syntax: ```sql +-- To talk to the primary for the duration of the next transaction: +SET SERVER ROLE TO 'primary'; + +-- To talk to the replica for the duration of the next transaction: +SET SERVER ROLE TO 'replica'; + +-- Let the query parser decide +SET SERVER ROLE TO 'auto'; + +-- Pick any server at random +SET SERVER ROLE TO 'any'; + +-- Reset to default configured settings +SET SERVER ROLE TO 'default'; +``` + +The setting will persist until it's changed again or the client disconnects. + +By default, all queries are routed to all servers; `default_role` setting controls this behavior. + +### Failover +All servers are checked with a `SELECT 1` query before being given to a client. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned. + +The ban time can be changed with `ban_time`. The default is 60 seconds. + +### Sharding +We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler. + +To route queries to a particular shard, we use this custom SQL syntax: + +```sql +-- To talk to a shard explicitely +SET SHARD TO '1'; + +-- To let the pooler choose based on a value SET SHARDING KEY TO '1234'; ``` -This sharding key will be hashed and the pooler will select a shard to use for the next transaction. If the pooler is in session mode, this sharding key has to be set as the first query on startup & cannot be changed until the client re-connects. +The active shard will last until it's changed again or the client disconnects. By default, the queries are routed to shard 0. -### Explicit read/write query routing +For hash function implementation, see `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql`. -If you want to have the primary and replicas in the same pooler, you'd probably want to -route queries explicitely to the primary or replicas, depending if they are reads or writes (e.g `SELECT`s or `INSERT`/`UPDATE`, etc). To help with this, we introduce some more custom syntax: +#### ActiveRecord/Rails -```sql -SET SERVER ROLE TO 'primary'; -SET SERVER ROLE TO 'replica'; +```ruby +class User < ActiveRecord::Base +end + +# Metadata will be fetched from shard 0 +ActiveRecord::Base.establish_connection + +# Grab a bunch of users from shard 1 +User.connection.execute "SET SHARD TO '1'" +User.take(10) + +# Using id as the sharding key +User.connection.execute "SET SHARDING KEY TO '1234'" +User.find_by_id(1234) + +# Using geographical sharding +User.connection.execute "SET SERVER ROLE TO 'primary'" +User.connection.execute "SET SHARDING KEY TO '85'" +User.create(name: "test user", email: "test@example.com", zone_id: 85) + +# Let the query parser figure out where the query should go. +# We are still on shard = hash(85) % shards. +User.connection.execute "SET SERVER ROLE TO 'auto'" +User.find_by_email("test@example.com") ``` -After executing this, the next transaction will be routed to the primary or replica respectively. By default, all queries will be load-balanced between all servers, so if the client wants to write or talk to the primary, they have to explicitely select it using the syntax above. +#### Raw SQL +```sql +-- Grab a bunch of users from shard 1 +SET SHARD TO '1'; +SELECT * FROM users LIMT 10; +-- Find by id +SET SHARDING KEY TO '1234'; +SELECT * FROM USERS WHERE id = 1234; -## Missing +-- Writing in a primary/replicas configuration. +SET SHARDING ROLE TO 'primary'; +SET SHARDING KEY TO '85'; +INSERT INTO users (name, email, zome_id) VALUES ('test user', 'test@example.com', 85); -1. Authentication, ehem, this proxy is letting anyone in at the moment. +SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query should go +SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts until set again; we are reading from the primary +``` ## Benchmarks diff --git a/src/client.rs b/src/client.rs index 184d46d..834bde0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,7 +14,7 @@ use crate::constants::*; use crate::errors::Error; use crate::messages::*; use crate::pool::{ClientServerMap, ConnectionPool}; -use crate::query_router::QueryRouter; +use crate::query_router::{Command, QueryRouter}; use crate::server::Server; use crate::stats::Reporter; @@ -198,29 +198,50 @@ impl Client { // SET SHARDING KEY TO 'bigint'; let mut message = read_message(&mut self.read).await?; - // Parse for special select shard command. - // SET SHARDING KEY TO 'bigint'; - if query_router.select_shard(message.clone()) { - custom_protocol_response_ok( + // Handle all custom protocol commands here. + match query_router.try_execute_command(message.clone()) { + // Normal query + None => { + if query_router.query_parser_enabled() && query_router.role() == None { + query_router.infer_role(message.clone()); + } + } + + Some((Command::SetShard, _)) | Some((Command::SetShardingKey, _)) => { + custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?; + continue; + } + + Some((Command::SetServerRole, _)) => { + custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; + continue; + } + + Some((Command::ShowServerRole, value)) => { + show_response(&mut self.write, "server role", &value).await?; + continue; + } + + Some((Command::ShowShard, value)) => { + show_response(&mut self.write, "shard", &value).await?; + continue; + } + }; + + // Make sure we selected a valid shard. + if query_router.shard() >= pool.shards() { + error_response( &mut self.write, - &format!("SET SHARD TO {}", query_router.shard()), + &format!( + "shard '{}' is more than configured '{}'", + query_router.shard(), + pool.shards() + ), ) .await?; continue; } - // Parse for special server role selection command. - // SET SERVER ROLE TO '(primary|replica)'; - if query_router.select_role(message.clone()) { - custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; - continue; - } - - // Attempt to parse the query to determine where it should go - if query_router.query_parser_enabled() && query_router.role() == None { - query_router.infer_role(message.clone()); - } - // Grab a server from the pool: the client issued a regular query. let connection = match pool.get(query_router.shard(), query_router.role()).await { Ok(conn) => conn, @@ -228,7 +249,6 @@ impl Client { println!(">> Could not get connection from pool: {:?}", err); error_response(&mut self.write, "could not get connection from the pool") .await?; - query_router.reset(); continue; } }; @@ -310,9 +330,6 @@ impl Client { if self.transaction_mode { // Report this client as idle. self.stats.client_idle(); - - query_router.reset(); - break; } } @@ -395,9 +412,6 @@ impl Client { if self.transaction_mode { self.stats.client_idle(); - - query_router.reset(); - break; } } @@ -431,8 +445,7 @@ impl Client { self.stats.transaction(); if self.transaction_mode { - query_router.reset(); - + self.stats.client_idle(); break; } } diff --git a/src/messages.rs b/src/messages.rs index 3b5914d..70dbc10 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -38,6 +38,16 @@ pub async fn backend_key_data( Ok(write_all(stream, key_data).await?) } +pub fn simple_query(query: &str) -> BytesMut { + let mut res = BytesMut::from(&b"Q"[..]); + let query = format!("{}\0", query); + + res.put_i32(query.len() as i32 + 4); + res.put_slice(&query.as_bytes()); + + res +} + /// Tell the client we're ready for another query. pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> { let mut bytes = BytesMut::with_capacity(5); @@ -229,6 +239,89 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul Ok(write_all_half(stream, res).await?) } +/// Respond to a SHOW SHARD command. +pub async fn show_response( + stream: &mut OwnedWriteHalf, + name: &str, + value: &str, +) -> Result<(), Error> { + // A SELECT response consists of: + // 1. RowDescription + // 2. One or more DataRow + // 3. CommandComplete + // 4. ReadyForQuery + + // RowDescription + let mut row_desc = BytesMut::new(); + + // Number of columns: 1 + row_desc.put_i16(1); + + // Column name + row_desc.put_slice(&format!("{}\0", name).as_bytes()); + + // Doesn't belong to any table + row_desc.put_i32(0); + + // Doesn't belong to any table + row_desc.put_i16(0); + + // Text + row_desc.put_i32(25); + + // Text size = variable (-1) + row_desc.put_i16(-1); + + // Type modifier: none that I know + row_desc.put_i32(0); + + // Format being used: text (0), binary (1) + row_desc.put_i16(0); + + // DataRow + let mut data_row = BytesMut::new(); + + // Number of columns + data_row.put_i16(1); + + // Size of the column content (length of the string really) + data_row.put_i32(value.len() as i32); + + // The content + data_row.put_slice(value.as_bytes()); + + // CommandComplete + let mut command_complete = BytesMut::new(); + + // Number of rows returned (just one) + command_complete.put_slice(&b"SELECT 1\0"[..]); + + // The final messages sent to the client + let mut res = BytesMut::new(); + + // RowDescription + res.put_u8(b'T'); + res.put_i32(row_desc.len() as i32 + 4); + res.put(row_desc); + + // DataRow + res.put_u8(b'D'); + res.put_i32(data_row.len() as i32 + 4); + res.put(data_row); + + // CommandComplete + res.put_u8(b'C'); + res.put_i32(command_complete.len() as i32 + 4); + res.put(command_complete); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} + /// Write all data in the buffer to the TcpStream. pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> { match stream.write_all(&buf).await { diff --git a/src/query_router.rs b/src/query_router.rs index c02e6a6..c43ca49 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1,22 +1,32 @@ +use crate::config::Role; +use crate::sharding::Sharder; /// Route queries automatically based on explicitely requested /// or implied query characteristics. use bytes::{Buf, BytesMut}; use once_cell::sync::OnceCell; -use regex::{Regex, RegexBuilder}; +use regex::RegexSet; use sqlparser::ast::Statement::{Query, StartTransaction}; use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; -use crate::config::Role; -use crate::sharding::Sharder; +const CUSTOM_SQL_REGEXES: [&str; 5] = [ + r"(?i)SET SHARDING KEY TO '[0-9]+'", + r"(?i)SET SHARD TO '[0-9]+'", + r"(?i)SHOW SHARD", + r"(?i)SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)'", + r"(?i)SHOW SERVER ROLE", +]; -const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+'"; -const SET_SHARD_REGEX: &str = r"SET SHARD TO '[0-9]+'"; -const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)'"; +#[derive(PartialEq, Debug)] +pub enum Command { + SetShardingKey, + SetShard, + ShowShard, + SetServerRole, + ShowServerRole, +} -static SHARDING_REGEX_RE: OnceCell = OnceCell::new(); -static ROLE_REGEX_RE: OnceCell = OnceCell::new(); -static SET_SHARD_REGEX_RE: OnceCell = OnceCell::new(); +static CUSTOM_SQL_REGEX_SET: OnceCell = OnceCell::new(); pub struct QueryRouter { // By default, queries go here, unless we have better information @@ -41,38 +51,18 @@ pub struct QueryRouter { impl QueryRouter { pub fn setup() -> bool { - // Compile our query routing regexes early, so we only do it once. - let a = match SHARDING_REGEX_RE.set( - RegexBuilder::new(SHARDING_REGEX) - .case_insensitive(true) - .build() - .unwrap(), - ) { - Ok(_) => true, - Err(_) => false, + let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) { + Ok(rgx) => rgx, + Err(err) => { + log::error!("QueryRouter::setup Could not compile regex set: {:?}", err); + return false; + } }; - let b = match ROLE_REGEX_RE.set( - RegexBuilder::new(ROLE_REGEX) - .case_insensitive(true) - .build() - .unwrap(), - ) { + match CUSTOM_SQL_REGEX_SET.set(set) { Ok(_) => true, Err(_) => false, - }; - - let c = match SET_SHARD_REGEX_RE.set( - RegexBuilder::new(SET_SHARD_REGEX) - .case_insensitive(true) - .build() - .unwrap(), - ) { - Ok(_) => true, - Err(_) => false, - }; - - a && b && c + } } pub fn new( @@ -92,104 +82,104 @@ impl QueryRouter { } } - /// Determine if the query is part of our special syntax, extract - /// the shard key, and return the shard to query based on Postgres' - /// PARTITION BY HASH function. - pub fn select_shard(&mut self, mut buf: BytesMut) -> bool { + /// Try to parse a command and execute it. + pub fn try_execute_command(&mut self, mut buf: BytesMut) -> Option<(Command, String)> { let code = buf.get_u8() as char; - // Only supporting simpe protocol here, so - // one would have to execute something like this: - // psql -c "SET SHARDING KEY TO '1234'" - // after sanitizing the value manually, which can be just done with an - // int parser, e.g. `let key = "1234".parse::().unwrap()`. - match code { - 'Q' => (), - _ => return false, - }; - - let len = buf.get_i32(); - let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]); // Don't read the ternminating null - - let sharding_key_rgx = match SHARDING_REGEX_RE.get() { - Some(r) => r, - None => return false, - }; - - let set_shard_rgx = match SET_SHARD_REGEX_RE.get() { - Some(r) => r, - None => return false, - }; - - if sharding_key_rgx.is_match(&query) { - let shard = query.split("'").collect::>()[1]; - - match shard.parse::() { - Ok(shard) => { - let sharder = Sharder::new(self.shards); - self.active_shard = Some(sharder.pg_bigint_hash(shard)); - - true - } - - // The shard must be a valid integer. Our regex won't let anything else pass, - // so this code will never run, but Rust can't know that, so we have to handle this - // case anyway. - Err(_) => false, - } - } else if set_shard_rgx.is_match(&query) { - let shard = query.split("'").collect::>()[1]; - match shard.parse::() { - Ok(shard) => { - self.active_shard = Some(shard); - true - } - Err(_) => false, - } - } else { - false + if code != 'Q' { + return None; } - } - /// Pick a primary or a replica from the pool. - pub fn select_role(&mut self, mut buf: BytesMut) -> bool { - let code = buf.get_u8() as char; + let len = buf.get_i32() as usize; + let query = String::from_utf8_lossy(&buf[..len - 5]).to_string(); // Ignore the terminating NULL. - // Same story as select_shard() above. - match code { - 'Q' => (), - _ => return false, + let regex_set = match CUSTOM_SQL_REGEX_SET.get() { + Some(regex_set) => regex_set, + None => return None, }; - let len = buf.get_i32(); - let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase(); + let matches: Vec<_> = regex_set.matches(&query).into_iter().collect(); - let rgx = match ROLE_REGEX_RE.get() { - Some(r) => r, - None => return false, - }; - - // Copy / paste from above. If we get one more of these use cases, - // it'll be time to abstract :). - if rgx.is_match(&query) { - let role = query.split("'").collect::>()[1]; - - match role { - "PRIMARY" => { - self.active_role = Some(Role::Primary); - true - } - "REPLICA" => { - self.active_role = Some(Role::Replica); - true - } - - // Our regex won't let this case happen, but Rust can't know that. - _ => false, - } - } else { - false + if matches.len() != 1 { + return None; } + + let command = match matches[0] { + 0 => Command::SetShardingKey, + 1 => Command::SetShard, + 2 => Command::ShowShard, + 3 => Command::SetServerRole, + 4 => Command::ShowServerRole, + _ => unreachable!(), + }; + + let mut value = match command { + Command::SetShardingKey | Command::SetShard | Command::SetServerRole => { + query.split("'").collect::>()[1].to_string() + } + + Command::ShowShard => self.shard().to_string(), + Command::ShowServerRole => match self.active_role { + Some(Role::Primary) => String::from("primary"), + Some(Role::Replica) => String::from("replica"), + None => { + if self.query_parser_enabled { + String::from("auto") + } else { + String::from("any") + } + } + }, + }; + + match command { + Command::SetShardingKey => { + let sharder = Sharder::new(self.shards); + let shard = sharder.pg_bigint_hash(value.parse::().unwrap()); + self.active_shard = Some(shard); + value = shard.to_string(); + } + + Command::SetShard => { + self.active_shard = Some(value.parse::().unwrap()); + } + + Command::SetServerRole => { + self.active_role = match value.to_ascii_lowercase().as_ref() { + "primary" => { + self.query_parser_enabled = false; + Some(Role::Primary) + } + + "replica" => { + self.query_parser_enabled = false; + Some(Role::Replica) + } + + "any" => { + self.query_parser_enabled = false; + None + } + + "auto" => { + self.query_parser_enabled = true; + None + } + + "default" => { + // TODO: reset query parser to default here. + self.active_role = self.default_server_role; + self.active_role + } + + _ => unreachable!(), + }; + } + + _ => (), + } + + Some((command, value)) } /// Try to infer which server to connect to based on the contents of the query. @@ -270,13 +260,13 @@ impl QueryRouter { pub fn shard(&self) -> usize { match self.active_shard { Some(shard) => shard, - None => 0, // TODO: pick random shard + None => 0, } } /// Reset the router back to defaults. /// This must be called at the end of every transaction in transaction mode. - pub fn reset(&mut self) { + pub fn _reset(&mut self) { self.active_role = self.default_server_role; self.active_shard = None; } @@ -290,87 +280,18 @@ impl QueryRouter { #[cfg(test)] mod test { use super::*; + use crate::messages::simple_query; use bytes::BufMut; - #[test] - fn test_select_shard() { - QueryRouter::setup(); - - let default_server_role: Option = None; - let shards = 5; - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); - - // Build the special syntax query. - let mut message = BytesMut::new(); - let query = BytesMut::from(&b"SET SHARDING KEY TO '13';\0"[..]); - - message.put_u8(b'Q'); // Query - message.put_i32(query.len() as i32 + 4); - message.put_slice(&query[..]); - - assert!(query_router.select_shard(message)); - assert_eq!(query_router.shard(), 3); // See sharding.rs (we are using 5 shards on purpose in this test) - - query_router.reset(); - assert_eq!(query_router.shard(), 0); - } - - #[test] - fn test_select_replica() { - QueryRouter::setup(); - - let default_server_role: Option = None; - let shards = 5; - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); - - // Build the special syntax query. - let mut message = BytesMut::new(); - let query = BytesMut::from(&b"SET SERVER ROLE TO 'replica';\0"[..]); - - message.put_u8(b'Q'); // Query - message.put_i32(query.len() as i32 + 4); - message.put_slice(&query[..]); - - assert!(query_router.select_role(message)); - assert_eq!(query_router.role(), Some(Role::Replica)); - - query_router.reset(); - - assert_eq!(query_router.role(), default_server_role); - } - #[test] fn test_defaults() { QueryRouter::setup(); let default_server_role: Option = None; let shards = 5; - let query_router = QueryRouter::new(default_server_role, shards, false, false); + let qr = QueryRouter::new(default_server_role, shards, false, false); - assert_eq!(query_router.shard(), 0); - assert_eq!(query_router.role(), None); - } - - #[test] - fn test_incorrect_syntax() { - QueryRouter::setup(); - - let default_server_role: Option = None; - let shards = 5; - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); - - // Build the special syntax query. - let mut message = BytesMut::new(); - - // Typo! - let query = BytesMut::from(&b"SET SERVER RLE TO 'replica';\0"[..]); - - message.put_u8(b'Q'); // Query - message.put_i32(query.len() as i32 + 4); - message.put_slice(&query[..]); - - assert_eq!(query_router.select_shard(message.clone()), false); - assert_eq!(query_router.select_role(message.clone()), false); + assert_eq!(qr.role(), None); } #[test] @@ -379,23 +300,20 @@ mod test { let default_server_role: Option = None; let shards = 5; - - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); + let mut qr = QueryRouter::new(default_server_role, shards, false, false); let queries = vec![ - BytesMut::from(&b"SELECT * FROM items WHERE id = 5\0"[..]), - BytesMut::from(&b"SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id\0"[..]), - BytesMut::from(&b"WITH t AS (SELECT * FROM items) SELECT * FROM t\0"[..]), + simple_query("SELECT * FROM items WHERE id = 5"), + simple_query( + "SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id", + ), + simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"), ]; - for query in &queries { - let mut res = BytesMut::from(&b"Q"[..]); - res.put_i32(query.len() as i32 + 4); - res.put(query.clone()); - + for query in queries { // It's a recognized query - assert!(query_router.infer_role(res)); - assert_eq!(query_router.role(), Some(Role::Replica)); + assert!(qr.infer_role(query)); + assert_eq!(qr.role(), Some(Role::Replica)); } } @@ -405,24 +323,19 @@ mod test { let default_server_role: Option = None; let shards = 5; - - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); + let mut qr = QueryRouter::new(default_server_role, shards, false, false); let queries = vec![ - BytesMut::from(&b"UPDATE items SET name = 'pumpkin' WHERE id = 5\0"[..]), - BytesMut::from(&b"INSERT INTO items (id, name) VALUES (5, 'pumpkin')\0"[..]), - BytesMut::from(&b"DELETE FROM items WHERE id = 5\0"[..]), - BytesMut::from(&b"BEGIN\0"[..]), // Transaction start + simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"), + simple_query("INSERT INTO items (id, name) VALUES (5, 'pumpkin')"), + simple_query("DELETE FROM items WHERE id = 5"), + simple_query("BEGIN"), // Transaction start ]; - for query in &queries { - let mut res = BytesMut::from(&b"Q"[..]); - res.put_i32(query.len() as i32 + 4); - res.put(query.clone()); - + for query in queries { // It's a recognized query - assert!(query_router.infer_role(res)); - assert_eq!(query_router.role(), Some(Role::Primary)); + assert!(qr.infer_role(query)); + assert_eq!(qr.role(), Some(Role::Primary)); } } @@ -432,16 +345,11 @@ mod test { let default_server_role: Option = None; let shards = 5; + let mut qr = QueryRouter::new(default_server_role, shards, true, false); + let query = simple_query("SELECT * FROM items WHERE id = 5"); - let mut query_router = QueryRouter::new(default_server_role, shards, true, false); - - let query = BytesMut::from(&b"SELECT * FROM items WHERE id = 5\0"[..]); - let mut res = BytesMut::from(&b"Q"[..]); - res.put_i32(query.len() as i32 + 4); - res.put(query.clone()); - - assert!(query_router.infer_role(res)); - assert_eq!(query_router.role(), None); + assert!(qr.infer_role(query)); + assert_eq!(qr.role(), None); } #[test] @@ -467,26 +375,112 @@ mod test { } #[test] - fn test_set_shard_explicitely() { + fn test_regex_set() { QueryRouter::setup(); - let default_server_role: Option = None; - let shards = 5; + let tests = [ + // Upper case + "SET SHARDING KEY TO '1'", + "SET SHARD TO '1'", + "SHOW SHARD", + "SET SERVER ROLE TO 'replica'", + "SET SERVER ROLE TO 'primary'", + "SET SERVER ROLE TO 'any'", + "SET SERVER ROLE TO 'auto'", + "SHOW SERVER ROLE", + // Lower case + "set sharding key to '1'", + "set shard to '1'", + "show shard", + "set server role to 'replica'", + "set server role to 'primary'", + "set server role to 'any'", + "set server role to 'auto'", + "show server role", + ]; - let mut query_router = QueryRouter::new(default_server_role, shards, false, false); + let set = CUSTOM_SQL_REGEX_SET.get().unwrap(); - // Build the special syntax query. - let mut message = BytesMut::new(); - let query = BytesMut::from(&b"SET SHARD TO '1'\0"[..]); + for test in &tests { + let matches: Vec<_> = set.matches(test).into_iter().collect(); - message.put_u8(b'Q'); // Query - message.put_i32(query.len() as i32 + 4); - message.put_slice(&query[..]); + assert_eq!(matches.len(), 1); + } + } - assert!(query_router.select_shard(message)); - assert_eq!(query_router.shard(), 1); // See sharding.rs (we are using 5 shards on purpose in this test) + #[test] + fn test_try_execute_command() { + QueryRouter::setup(); + let mut qr = QueryRouter::new(Some(Role::Primary), 5, false, false); - query_router.reset(); - assert_eq!(query_router.shard(), 0); + // SetShardingKey + let query = simple_query("SET SHARDING KEY TO '13'"); + assert_eq!( + qr.try_execute_command(query), + Some((Command::SetShardingKey, String::from("3"))) + ); + assert_eq!(qr.shard(), 3); + + // SetShard + let query = simple_query("SET SHARD TO '1'"); + assert_eq!( + qr.try_execute_command(query), + Some((Command::SetShard, String::from("1"))) + ); + assert_eq!(qr.shard(), 1); + + // ShowShard + let query = simple_query("SHOW SHARD"); + assert_eq!( + qr.try_execute_command(query), + Some((Command::ShowShard, String::from("1"))) + ); + + // SetServerRole + let roles = ["primary", "replica", "any", "auto", "primary"]; + let verify_roles = [ + Some(Role::Primary), + Some(Role::Replica), + None, + None, + Some(Role::Primary), + ]; + let query_parser_enabled = [false, false, false, true, false]; + + for (idx, role) in roles.iter().enumerate() { + let query = simple_query(&format!("SET SERVER ROLE TO '{}'", role)); + assert_eq!( + qr.try_execute_command(query), + Some((Command::SetServerRole, String::from(*role))) + ); + assert_eq!(qr.role(), verify_roles[idx],); + assert_eq!(qr.query_parser_enabled(), query_parser_enabled[idx],); + + // ShowServerRole + let query = simple_query("SHOW SERVER ROLE"); + assert_eq!( + qr.try_execute_command(query), + Some((Command::ShowServerRole, String::from(*role))) + ); + } + } + + #[test] + fn test_enable_query_parser() { + QueryRouter::setup(); + let mut qr = QueryRouter::new(None, 5, false, false); + let query = simple_query("SET SERVER ROLE TO 'auto'"); + + assert!(qr.try_execute_command(query) != None); + assert!(qr.query_parser_enabled()); + assert_eq!(qr.role(), None); + + let query = simple_query("INSERT INTO test_table VALUES (1)"); + assert_eq!(qr.infer_role(query), true); + assert_eq!(qr.role(), Some(Role::Primary)); + + let query = simple_query("SELECT * FROM test_table"); + assert_eq!(qr.infer_role(query), true); + assert_eq!(qr.role(), Some(Role::Replica)); } } diff --git a/tests/ruby/tests.rb b/tests/ruby/tests.rb index 18d1bfb..3ff39ab 100644 --- a/tests/ruby/tests.rb +++ b/tests/ruby/tests.rb @@ -1,7 +1,8 @@ require "active_record" -ActiveRecord.verbose_query_logs = true -ActiveRecord::Base.logger = Logger.new(STDOUT) +# Uncomment these two to see all queries. +# ActiveRecord.verbose_query_logs = true +# ActiveRecord::Base.logger = Logger.new(STDOUT) ActiveRecord::Base.establish_connection( adapter: "postgresql", @@ -18,6 +19,13 @@ class TestTable < ActiveRecord::Base self.table_name = "test_table" end +class TestSafeTable < ActiveRecord::Base + self.table_name = "test_safe_table" +end + +class ShouldNeverHappenException < Exception +end + # # Create the table. class CreateTestTable < ActiveRecord::Migration[7.0] # Disable transasctions or things will fly out of order! @@ -30,6 +38,7 @@ class CreateTestTable < ActiveRecord::Migration[7.0] # This will make this migration reversible! reversible do connection.execute "SET SHARD TO '#{x.to_i}'" + connection.execute "SET SERVER ROLE TO 'primary'" end # Always wrap the entire migration inside a transaction. If that's not possible, @@ -47,28 +56,82 @@ class CreateTestTable < ActiveRecord::Migration[7.0] end end -begin - CreateTestTable.migrate(:down) -rescue Exception - puts "Tables don't exist yet" -end +class CreateSafeShardedTable < ActiveRecord::Migration[7.0] + # Disable transasctions or things will fly out of order! + disable_ddl_transaction! -CreateTestTable.migrate(:up) + SHARDS = 3 -10.times do |x| - x += 1 # Postgres ids start at 1 - r = TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" + def up + SHARDS.times do |x| + # This will make this migration reversible! + connection.execute "SET SHARD TO '#{x.to_i}'" + connection.execute "SET SERVER ROLE TO 'primary'" - # Always wrap writes inside explicit transactions like these because ActiveRecord may fetch table info - # before actually issuing the `INSERT` statement. This ensures that that happens inside a transaction - # and the write goes to the correct shard. - TestTable.connection.transaction do - TestTable.create(id: x, name: "something_special_#{x.to_i}", description: "It's a surprise!") + connection.execute <<-SQL + CREATE TABLE test_safe_table ( + id BIGINT PRIMARY KEY, + name VARCHAR, + description TEXT + ) PARTITION BY HASH (id); + + CREATE TABLE test_safe_table_data PARTITION OF test_safe_table + FOR VALUES WITH (MODULUS #{SHARDS.to_i}, REMAINDER #{x.to_i}); + SQL + end + end + + def down + SHARDS.times do |x| + connection.execute "SET SHARD TO '#{x.to_i}'" + connection.execute "SET SERVER ROLE TO 'primary'" + connection.execute "DROP TABLE test_safe_table CASCADE" + end end end -10.times do |x| - x += 1 # 0 confuses our sharding function - TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" - puts TestTable.find_by_id(x).id +20.times do + begin + CreateTestTable.migrate(:down) + rescue Exception + puts "Tables don't exist yet" + end + + begin + CreateSafeShardedTable.migrate(:down) + rescue Exception + puts "Tables don't exist yet" + end + + CreateTestTable.migrate(:up) + CreateSafeShardedTable.migrate(:up) + + 3.times do |x| + TestSafeTable.connection.execute "SET SHARD TO '#{x.to_i}'" + TestSafeTable.connection.execute "SET SERVER ROLE TO 'primary'" + TestSafeTable.connection.execute "TRUNCATE #{TestTable.table_name}" + end + + 10.times do |x| + x += 1 # Postgres ids start at 1 + TestSafeTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" + TestSafeTable.connection.execute "SET SERVER ROLE TO 'primary'" + TestSafeTable.create(id: x, name: "something_special_#{x.to_i}", description: "It's a surprise!") + end + + 10.times do |x| + x += 1 # 0 confuses our sharding function + TestSafeTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" + TestSafeTable.connection.execute "SET SERVER ROLE TO 'replica'" + TestSafeTable.find_by_id(x).id + end +end + +# Test wrong shard +TestSafeTable.connection.execute "SET SHARD TO '1'" +begin + TestSafeTable.create(id: 5, name: "test", description: "test description") + raise ShouldNeverHappenException("Uh oh") +rescue ActiveRecord::StatementInvalid + puts "OK" end