More query router commands; settings last until changed again; docs (#25)

* readme

* touch up docs

* stuff

* refactor query router

* remove unused

* less verbose

* docs

* no link

* method rename
This commit is contained in:
Lev Kokotov
2022-02-19 08:57:24 -08:00
committed by GitHub
parent bbacb9cf01
commit a556ec1c43
5 changed files with 579 additions and 346 deletions

172
README.md
View File

@@ -8,6 +8,26 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
**Alpha**: don't use in production just yet. **Alpha**: don't use in production just yet.
## Features
| **Feature** | **Status** | **Comments** |
|--------------------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | Identical to PgBouncer. |
| Session pooling | :heavy_check_mark: | Identical to PgBouncer. |
| `COPY` support | :heavy_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
| Query cancellation | :heavy_check_mark: | Supported both in transaction and session pooling modes. |
| Load balancing of read queries | :heavy_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
| Sharding | :heavy_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
| Failover | :heavy_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
| Statistics reporting | :heavy_check_mark: | Statistics similar to PgBouncers are reported via StatsD. |
| Live configuration reloading | :x: :wrench: | On the roadmap; currently config changes require restart. |
| Client authentication | :x: :wrench: | On the roadmap; currently all clients are allowed to connect and one user is used to connect to Postgres. |
## Deployment
See `Dockerfile` for example deployment using Docker. The pooler is configured to spawn 4 workers so 4 CPUs are recommended for optimal performance.
That setting can be adjusted to spawn as many (or as little) workers as needed.
## Local development ## Local development
1. Install Rust (latest stable will work great). 1. Install Rust (latest stable will work great).
@@ -18,7 +38,7 @@ Meow. PgBouncer rewritten in Rust, with sharding, load balancing and failover su
### Tests ### Tests
You can just PgBench to test your changes: Quickest way to test your changes is to use pgbench:
``` ```
pgbench -i -h 127.0.0.1 -p 6432 && \ pgbench -i -h 127.0.0.1 -p 6432 && \
@@ -28,80 +48,130 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
See [sharding README](./tests/sharding/README.md) for sharding logic testing. See [sharding README](./tests/sharding/README.md) for sharding logic testing.
## Features | **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :heavy_check_mark: | :heavy_check_mark: | Used by default for all tests. |
| Session pooling | :x: | :heavy_check_mark: | Easiest way to test is to enable it and run pgbench - results will be better than transaction pooling as expected. |
| `COPY` | :heavy_check_mark: | :heavy_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. |
| Query cancellation | :heavy_check_mark: | :heavy_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. |
| Load balancing | :x: | :heavy_check_mark: | We could test this by emitting statistics for each replica and compare them. |
| Failover | :x: | :heavy_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. |
| Sharding | :heavy_check_mark: | :heavy_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. |
| Statistics reporting | :x: | :heavy_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. |
1. Session mode.
2. Transaction mode. ## Usage
3. `COPY` protocol support.
4. Query cancellation.
5. Round-robin load balancing of replicas.
6. Banlist & failover.
7. Sharding!
8. Explicit query routing to primary or replicas.
### Session mode ### Session mode
Each client owns its own server for the duration of the session. Commands like `SET` are allowed. In session mode, a client talks to one server for the duration of the connection. Prepared statements, `SET`, and advisory locks are supported. In terms of supported features, there is very little if any difference between session mode and talking directly to the server.
This is identical to PgBouncer session mode.
To use session mode, change `pool_mode = "session"`.
### Transaction mode ### Transaction mode
The connection is attached to the server for the duration of the transaction. `SET` will pollute the connection, In transaction mode, a client talks to one server for the duration of a single transaction; once it's over, the server is returned to the pool. Prepared statements, `SET`, and advisory locks are not supported; alternatives are to use `SET LOCAL` and `pg_advisory_xact_lock` which are scoped to the transaction.
but `SET LOCAL` works great. Identical to PgBouncer transaction mode.
### COPY protocol This mode is enabled by default.
That one isn't particularly special, but good to mention that you can `COPY` data in and from the server
using this pooler.
### Query cancellation ### Load balancing of read queries
Okay, this is just basic stuff, but we support cancelling queries. If you know the Postgres protocol, All queries are load balanced against the configured servers using the round-robin algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
this might be relevant given than this is a transactional pooler but if you're new to Pg, don't worry about it, it works.
### Round-robin load balancing If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
This is the novel part. PgBouncer doesn't support it and suggests we use DNS or a TCP proxy instead.
We prefer to have everything as part of one package; arguably, it's easier to understand and optimize.
This pooler will round-robin between multiple replicas keeping load reasonably even. If the primary is in
the pool as well, it'll be treated as a replica for read-only queries.
### Banlist & failover The query parser is disabled by default.
This is where it gets even more interesting. If we fail to connect to one of the replicas or it fails a health check,
we add it to a ban list. No more new transactions will be served by that replica for, in our case, 60 seconds. This
gives it the opportunity to recover while clients are happily served by the remaining replicas.
This decreases error rates substantially! Worth noting here that on busy systems, if the replicas are running too hot, #### Query parser
failing over could bring even more load and tip over the remaining healthy-ish replicas. In this case, a decision should be made: The query parser will do its best to determine where the query should go, but sometimes that's not possible. In that case, the client can select which server it wants using this custom SQL syntax:
either lose 1/x of your traffic or risk losing it all eventually. Ideally you overprovision your system, so you don't necessarily need
to make this choice :-).
### Sharding
We're implemeting Postgres' `PARTITION BY HASH` sharding function for `BIGINT` fields. This works well for tables that use `BIGSERIAL` primary key which I think is common enough these days. We can also add many more functions here, but this is a good start. See `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql` for more details on the implementation.
The biggest advantage of using this sharding function is that anyone can shard the dataset using Postgres partitions
while also access it for both reads and writes using this pooler. No custom obscure sharding function is needed and database sharding can be done entirely in Postgres.
To select the shard we want to talk to, we introduced special syntax:
```sql ```sql
-- To talk to the primary for the duration of the next transaction:
SET SERVER ROLE TO 'primary';
-- To talk to the replica for the duration of the next transaction:
SET SERVER ROLE TO 'replica';
-- Let the query parser decide
SET SERVER ROLE TO 'auto';
-- Pick any server at random
SET SERVER ROLE TO 'any';
-- Reset to default configured settings
SET SERVER ROLE TO 'default';
```
The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to all servers; `default_role` setting controls this behavior.
### Failover
All servers are checked with a `SELECT 1` query before being given to a client. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
The ban time can be changed with `ban_time`. The default is 60 seconds.
### Sharding
We use the `PARTITION BY HASH` hashing function, the same as used by Postgres for declarative partitioning. This allows to shard the database using Postgres partitions and place the partitions on different servers (shards). Both read and write queries can be routed to the shards using this pooler.
To route queries to a particular shard, we use this custom SQL syntax:
```sql
-- To talk to a shard explicitely
SET SHARD TO '1';
-- To let the pooler choose based on a value
SET SHARDING KEY TO '1234'; SET SHARDING KEY TO '1234';
``` ```
This sharding key will be hashed and the pooler will select a shard to use for the next transaction. If the pooler is in session mode, this sharding key has to be set as the first query on startup & cannot be changed until the client re-connects. The active shard will last until it's changed again or the client disconnects. By default, the queries are routed to shard 0.
### Explicit read/write query routing For hash function implementation, see `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql`.
If you want to have the primary and replicas in the same pooler, you'd probably want to #### ActiveRecord/Rails
route queries explicitely to the primary or replicas, depending if they are reads or writes (e.g `SELECT`s or `INSERT`/`UPDATE`, etc). To help with this, we introduce some more custom syntax:
```sql ```ruby
SET SERVER ROLE TO 'primary'; class User < ActiveRecord::Base
SET SERVER ROLE TO 'replica'; end
# Metadata will be fetched from shard 0
ActiveRecord::Base.establish_connection
# Grab a bunch of users from shard 1
User.connection.execute "SET SHARD TO '1'"
User.take(10)
# Using id as the sharding key
User.connection.execute "SET SHARDING KEY TO '1234'"
User.find_by_id(1234)
# Using geographical sharding
User.connection.execute "SET SERVER ROLE TO 'primary'"
User.connection.execute "SET SHARDING KEY TO '85'"
User.create(name: "test user", email: "test@example.com", zone_id: 85)
# Let the query parser figure out where the query should go.
# We are still on shard = hash(85) % shards.
User.connection.execute "SET SERVER ROLE TO 'auto'"
User.find_by_email("test@example.com")
``` ```
After executing this, the next transaction will be routed to the primary or replica respectively. By default, all queries will be load-balanced between all servers, so if the client wants to write or talk to the primary, they have to explicitely select it using the syntax above. #### Raw SQL
```sql
-- Grab a bunch of users from shard 1
SET SHARD TO '1';
SELECT * FROM users LIMT 10;
-- Find by id
SET SHARDING KEY TO '1234';
SELECT * FROM USERS WHERE id = 1234;
## Missing -- Writing in a primary/replicas configuration.
SET SHARDING ROLE TO 'primary';
SET SHARDING KEY TO '85';
INSERT INTO users (name, email, zome_id) VALUES ('test user', 'test@example.com', 85);
1. Authentication, ehem, this proxy is letting anyone in at the moment. SET SERVER ROLE TO 'auto'; -- let the query router figure out where the query should go
SELECT * FROM users WHERE email = 'test@example.com'; -- shard setting lasts until set again; we are reading from the primary
```
## Benchmarks ## Benchmarks

View File

@@ -14,7 +14,7 @@ use crate::constants::*;
use crate::errors::Error; use crate::errors::Error;
use crate::messages::*; use crate::messages::*;
use crate::pool::{ClientServerMap, ConnectionPool}; use crate::pool::{ClientServerMap, ConnectionPool};
use crate::query_router::QueryRouter; use crate::query_router::{Command, QueryRouter};
use crate::server::Server; use crate::server::Server;
use crate::stats::Reporter; use crate::stats::Reporter;
@@ -198,29 +198,50 @@ impl Client {
// SET SHARDING KEY TO 'bigint'; // SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?; let mut message = read_message(&mut self.read).await?;
// Parse for special select shard command. // Handle all custom protocol commands here.
// SET SHARDING KEY TO 'bigint'; match query_router.try_execute_command(message.clone()) {
if query_router.select_shard(message.clone()) { // Normal query
custom_protocol_response_ok( None => {
if query_router.query_parser_enabled() && query_router.role() == None {
query_router.infer_role(message.clone());
}
}
Some((Command::SetShard, _)) | Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, &format!("SET SHARD")).await?;
continue;
}
Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue;
}
Some((Command::ShowServerRole, value)) => {
show_response(&mut self.write, "server role", &value).await?;
continue;
}
Some((Command::ShowShard, value)) => {
show_response(&mut self.write, "shard", &value).await?;
continue;
}
};
// Make sure we selected a valid shard.
if query_router.shard() >= pool.shards() {
error_response(
&mut self.write, &mut self.write,
&format!("SET SHARD TO {}", query_router.shard()), &format!(
"shard '{}' is more than configured '{}'",
query_router.shard(),
pool.shards()
),
) )
.await?; .await?;
continue; continue;
} }
// Parse for special server role selection command.
// SET SERVER ROLE TO '(primary|replica)';
if query_router.select_role(message.clone()) {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue;
}
// Attempt to parse the query to determine where it should go
if query_router.query_parser_enabled() && query_router.role() == None {
query_router.infer_role(message.clone());
}
// Grab a server from the pool: the client issued a regular query. // Grab a server from the pool: the client issued a regular query.
let connection = match pool.get(query_router.shard(), query_router.role()).await { let connection = match pool.get(query_router.shard(), query_router.role()).await {
Ok(conn) => conn, Ok(conn) => conn,
@@ -228,7 +249,6 @@ impl Client {
println!(">> Could not get connection from pool: {:?}", err); println!(">> Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool") error_response(&mut self.write, "could not get connection from the pool")
.await?; .await?;
query_router.reset();
continue; continue;
} }
}; };
@@ -310,9 +330,6 @@ impl Client {
if self.transaction_mode { if self.transaction_mode {
// Report this client as idle. // Report this client as idle.
self.stats.client_idle(); self.stats.client_idle();
query_router.reset();
break; break;
} }
} }
@@ -395,9 +412,6 @@ impl Client {
if self.transaction_mode { if self.transaction_mode {
self.stats.client_idle(); self.stats.client_idle();
query_router.reset();
break; break;
} }
} }
@@ -431,8 +445,7 @@ impl Client {
self.stats.transaction(); self.stats.transaction();
if self.transaction_mode { if self.transaction_mode {
query_router.reset(); self.stats.client_idle();
break; break;
} }
} }

View File

@@ -38,6 +38,16 @@ pub async fn backend_key_data(
Ok(write_all(stream, key_data).await?) Ok(write_all(stream, key_data).await?)
} }
pub fn simple_query(query: &str) -> BytesMut {
let mut res = BytesMut::from(&b"Q"[..]);
let query = format!("{}\0", query);
res.put_i32(query.len() as i32 + 4);
res.put_slice(&query.as_bytes());
res
}
/// Tell the client we're ready for another query. /// Tell the client we're ready for another query.
pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> { pub async fn ready_for_query(stream: &mut TcpStream) -> Result<(), Error> {
let mut bytes = BytesMut::with_capacity(5); let mut bytes = BytesMut::with_capacity(5);
@@ -229,6 +239,89 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul
Ok(write_all_half(stream, res).await?) Ok(write_all_half(stream, res).await?)
} }
/// Respond to a SHOW SHARD command.
pub async fn show_response(
stream: &mut OwnedWriteHalf,
name: &str,
value: &str,
) -> Result<(), Error> {
// A SELECT response consists of:
// 1. RowDescription
// 2. One or more DataRow
// 3. CommandComplete
// 4. ReadyForQuery
// RowDescription
let mut row_desc = BytesMut::new();
// Number of columns: 1
row_desc.put_i16(1);
// Column name
row_desc.put_slice(&format!("{}\0", name).as_bytes());
// Doesn't belong to any table
row_desc.put_i32(0);
// Doesn't belong to any table
row_desc.put_i16(0);
// Text
row_desc.put_i32(25);
// Text size = variable (-1)
row_desc.put_i16(-1);
// Type modifier: none that I know
row_desc.put_i32(0);
// Format being used: text (0), binary (1)
row_desc.put_i16(0);
// DataRow
let mut data_row = BytesMut::new();
// Number of columns
data_row.put_i16(1);
// Size of the column content (length of the string really)
data_row.put_i32(value.len() as i32);
// The content
data_row.put_slice(value.as_bytes());
// CommandComplete
let mut command_complete = BytesMut::new();
// Number of rows returned (just one)
command_complete.put_slice(&b"SELECT 1\0"[..]);
// The final messages sent to the client
let mut res = BytesMut::new();
// RowDescription
res.put_u8(b'T');
res.put_i32(row_desc.len() as i32 + 4);
res.put(row_desc);
// DataRow
res.put_u8(b'D');
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);
// CommandComplete
res.put_u8(b'C');
res.put_i32(command_complete.len() as i32 + 4);
res.put(command_complete);
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, res).await
}
/// Write all data in the buffer to the TcpStream. /// Write all data in the buffer to the TcpStream.
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> { pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
match stream.write_all(&buf).await { match stream.write_all(&buf).await {

View File

@@ -1,22 +1,32 @@
use crate::config::Role;
use crate::sharding::Sharder;
/// Route queries automatically based on explicitely requested /// Route queries automatically based on explicitely requested
/// or implied query characteristics. /// or implied query characteristics.
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use regex::{Regex, RegexBuilder}; use regex::RegexSet;
use sqlparser::ast::Statement::{Query, StartTransaction}; use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::dialect::PostgreSqlDialect; use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser; use sqlparser::parser::Parser;
use crate::config::Role; const CUSTOM_SQL_REGEXES: [&str; 5] = [
use crate::sharding::Sharder; r"(?i)SET SHARDING KEY TO '[0-9]+'",
r"(?i)SET SHARD TO '[0-9]+'",
r"(?i)SHOW SHARD",
r"(?i)SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)'",
r"(?i)SHOW SERVER ROLE",
];
const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+'"; #[derive(PartialEq, Debug)]
const SET_SHARD_REGEX: &str = r"SET SHARD TO '[0-9]+'"; pub enum Command {
const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)'"; SetShardingKey,
SetShard,
ShowShard,
SetServerRole,
ShowServerRole,
}
static SHARDING_REGEX_RE: OnceCell<Regex> = OnceCell::new(); static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
static ROLE_REGEX_RE: OnceCell<Regex> = OnceCell::new();
static SET_SHARD_REGEX_RE: OnceCell<Regex> = OnceCell::new();
pub struct QueryRouter { pub struct QueryRouter {
// By default, queries go here, unless we have better information // By default, queries go here, unless we have better information
@@ -41,38 +51,18 @@ pub struct QueryRouter {
impl QueryRouter { impl QueryRouter {
pub fn setup() -> bool { pub fn setup() -> bool {
// Compile our query routing regexes early, so we only do it once. let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
let a = match SHARDING_REGEX_RE.set( Ok(rgx) => rgx,
RegexBuilder::new(SHARDING_REGEX) Err(err) => {
.case_insensitive(true) log::error!("QueryRouter::setup Could not compile regex set: {:?}", err);
.build() return false;
.unwrap(), }
) {
Ok(_) => true,
Err(_) => false,
}; };
let b = match ROLE_REGEX_RE.set( match CUSTOM_SQL_REGEX_SET.set(set) {
RegexBuilder::new(ROLE_REGEX)
.case_insensitive(true)
.build()
.unwrap(),
) {
Ok(_) => true, Ok(_) => true,
Err(_) => false, Err(_) => false,
}; }
let c = match SET_SHARD_REGEX_RE.set(
RegexBuilder::new(SET_SHARD_REGEX)
.case_insensitive(true)
.build()
.unwrap(),
) {
Ok(_) => true,
Err(_) => false,
};
a && b && c
} }
pub fn new( pub fn new(
@@ -92,104 +82,104 @@ impl QueryRouter {
} }
} }
/// Determine if the query is part of our special syntax, extract /// Try to parse a command and execute it.
/// the shard key, and return the shard to query based on Postgres' pub fn try_execute_command(&mut self, mut buf: BytesMut) -> Option<(Command, String)> {
/// PARTITION BY HASH function.
pub fn select_shard(&mut self, mut buf: BytesMut) -> bool {
let code = buf.get_u8() as char; let code = buf.get_u8() as char;
// Only supporting simpe protocol here, so if code != 'Q' {
// one would have to execute something like this: return None;
// psql -c "SET SHARDING KEY TO '1234'"
// after sanitizing the value manually, which can be just done with an
// int parser, e.g. `let key = "1234".parse::<i64>().unwrap()`.
match code {
'Q' => (),
_ => return false,
};
let len = buf.get_i32();
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]); // Don't read the ternminating null
let sharding_key_rgx = match SHARDING_REGEX_RE.get() {
Some(r) => r,
None => return false,
};
let set_shard_rgx = match SET_SHARD_REGEX_RE.get() {
Some(r) => r,
None => return false,
};
if sharding_key_rgx.is_match(&query) {
let shard = query.split("'").collect::<Vec<&str>>()[1];
match shard.parse::<i64>() {
Ok(shard) => {
let sharder = Sharder::new(self.shards);
self.active_shard = Some(sharder.pg_bigint_hash(shard));
true
}
// The shard must be a valid integer. Our regex won't let anything else pass,
// so this code will never run, but Rust can't know that, so we have to handle this
// case anyway.
Err(_) => false,
}
} else if set_shard_rgx.is_match(&query) {
let shard = query.split("'").collect::<Vec<&str>>()[1];
match shard.parse::<usize>() {
Ok(shard) => {
self.active_shard = Some(shard);
true
}
Err(_) => false,
}
} else {
false
} }
}
/// Pick a primary or a replica from the pool. let len = buf.get_i32() as usize;
pub fn select_role(&mut self, mut buf: BytesMut) -> bool { let query = String::from_utf8_lossy(&buf[..len - 5]).to_string(); // Ignore the terminating NULL.
let code = buf.get_u8() as char;
// Same story as select_shard() above. let regex_set = match CUSTOM_SQL_REGEX_SET.get() {
match code { Some(regex_set) => regex_set,
'Q' => (), None => return None,
_ => return false,
}; };
let len = buf.get_i32(); let matches: Vec<_> = regex_set.matches(&query).into_iter().collect();
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase();
let rgx = match ROLE_REGEX_RE.get() { if matches.len() != 1 {
Some(r) => r, return None;
None => return false,
};
// Copy / paste from above. If we get one more of these use cases,
// it'll be time to abstract :).
if rgx.is_match(&query) {
let role = query.split("'").collect::<Vec<&str>>()[1];
match role {
"PRIMARY" => {
self.active_role = Some(Role::Primary);
true
}
"REPLICA" => {
self.active_role = Some(Role::Replica);
true
}
// Our regex won't let this case happen, but Rust can't know that.
_ => false,
}
} else {
false
} }
let command = match matches[0] {
0 => Command::SetShardingKey,
1 => Command::SetShard,
2 => Command::ShowShard,
3 => Command::SetServerRole,
4 => Command::ShowServerRole,
_ => unreachable!(),
};
let mut value = match command {
Command::SetShardingKey | Command::SetShard | Command::SetServerRole => {
query.split("'").collect::<Vec<&str>>()[1].to_string()
}
Command::ShowShard => self.shard().to_string(),
Command::ShowServerRole => match self.active_role {
Some(Role::Primary) => String::from("primary"),
Some(Role::Replica) => String::from("replica"),
None => {
if self.query_parser_enabled {
String::from("auto")
} else {
String::from("any")
}
}
},
};
match command {
Command::SetShardingKey => {
let sharder = Sharder::new(self.shards);
let shard = sharder.pg_bigint_hash(value.parse::<i64>().unwrap());
self.active_shard = Some(shard);
value = shard.to_string();
}
Command::SetShard => {
self.active_shard = Some(value.parse::<usize>().unwrap());
}
Command::SetServerRole => {
self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => {
self.query_parser_enabled = false;
Some(Role::Primary)
}
"replica" => {
self.query_parser_enabled = false;
Some(Role::Replica)
}
"any" => {
self.query_parser_enabled = false;
None
}
"auto" => {
self.query_parser_enabled = true;
None
}
"default" => {
// TODO: reset query parser to default here.
self.active_role = self.default_server_role;
self.active_role
}
_ => unreachable!(),
};
}
_ => (),
}
Some((command, value))
} }
/// Try to infer which server to connect to based on the contents of the query. /// Try to infer which server to connect to based on the contents of the query.
@@ -270,13 +260,13 @@ impl QueryRouter {
pub fn shard(&self) -> usize { pub fn shard(&self) -> usize {
match self.active_shard { match self.active_shard {
Some(shard) => shard, Some(shard) => shard,
None => 0, // TODO: pick random shard None => 0,
} }
} }
/// Reset the router back to defaults. /// Reset the router back to defaults.
/// This must be called at the end of every transaction in transaction mode. /// This must be called at the end of every transaction in transaction mode.
pub fn reset(&mut self) { pub fn _reset(&mut self) {
self.active_role = self.default_server_role; self.active_role = self.default_server_role;
self.active_shard = None; self.active_shard = None;
} }
@@ -290,87 +280,18 @@ impl QueryRouter {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::messages::simple_query;
use bytes::BufMut; use bytes::BufMut;
#[test]
fn test_select_shard() {
QueryRouter::setup();
let default_server_role: Option<Role> = None;
let shards = 5;
let mut query_router = QueryRouter::new(default_server_role, shards, false, false);
// Build the special syntax query.
let mut message = BytesMut::new();
let query = BytesMut::from(&b"SET SHARDING KEY TO '13';\0"[..]);
message.put_u8(b'Q'); // Query
message.put_i32(query.len() as i32 + 4);
message.put_slice(&query[..]);
assert!(query_router.select_shard(message));
assert_eq!(query_router.shard(), 3); // See sharding.rs (we are using 5 shards on purpose in this test)
query_router.reset();
assert_eq!(query_router.shard(), 0);
}
#[test]
fn test_select_replica() {
QueryRouter::setup();
let default_server_role: Option<Role> = None;
let shards = 5;
let mut query_router = QueryRouter::new(default_server_role, shards, false, false);
// Build the special syntax query.
let mut message = BytesMut::new();
let query = BytesMut::from(&b"SET SERVER ROLE TO 'replica';\0"[..]);
message.put_u8(b'Q'); // Query
message.put_i32(query.len() as i32 + 4);
message.put_slice(&query[..]);
assert!(query_router.select_role(message));
assert_eq!(query_router.role(), Some(Role::Replica));
query_router.reset();
assert_eq!(query_router.role(), default_server_role);
}
#[test] #[test]
fn test_defaults() { fn test_defaults() {
QueryRouter::setup(); QueryRouter::setup();
let default_server_role: Option<Role> = None; let default_server_role: Option<Role> = None;
let shards = 5; let shards = 5;
let query_router = QueryRouter::new(default_server_role, shards, false, false); let qr = QueryRouter::new(default_server_role, shards, false, false);
assert_eq!(query_router.shard(), 0); assert_eq!(qr.role(), None);
assert_eq!(query_router.role(), None);
}
#[test]
fn test_incorrect_syntax() {
QueryRouter::setup();
let default_server_role: Option<Role> = None;
let shards = 5;
let mut query_router = QueryRouter::new(default_server_role, shards, false, false);
// Build the special syntax query.
let mut message = BytesMut::new();
// Typo!
let query = BytesMut::from(&b"SET SERVER RLE TO 'replica';\0"[..]);
message.put_u8(b'Q'); // Query
message.put_i32(query.len() as i32 + 4);
message.put_slice(&query[..]);
assert_eq!(query_router.select_shard(message.clone()), false);
assert_eq!(query_router.select_role(message.clone()), false);
} }
#[test] #[test]
@@ -379,23 +300,20 @@ mod test {
let default_server_role: Option<Role> = None; let default_server_role: Option<Role> = None;
let shards = 5; let shards = 5;
let mut qr = QueryRouter::new(default_server_role, shards, false, false);
let mut query_router = QueryRouter::new(default_server_role, shards, false, false);
let queries = vec![ let queries = vec![
BytesMut::from(&b"SELECT * FROM items WHERE id = 5\0"[..]), simple_query("SELECT * FROM items WHERE id = 5"),
BytesMut::from(&b"SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id\0"[..]), simple_query(
BytesMut::from(&b"WITH t AS (SELECT * FROM items) SELECT * FROM t\0"[..]), "SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id",
),
simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"),
]; ];
for query in &queries { for query in queries {
let mut res = BytesMut::from(&b"Q"[..]);
res.put_i32(query.len() as i32 + 4);
res.put(query.clone());
// It's a recognized query // It's a recognized query
assert!(query_router.infer_role(res)); assert!(qr.infer_role(query));
assert_eq!(query_router.role(), Some(Role::Replica)); assert_eq!(qr.role(), Some(Role::Replica));
} }
} }
@@ -405,24 +323,19 @@ mod test {
let default_server_role: Option<Role> = None; let default_server_role: Option<Role> = None;
let shards = 5; let shards = 5;
let mut qr = QueryRouter::new(default_server_role, shards, false, false);
let mut query_router = QueryRouter::new(default_server_role, shards, false, false);
let queries = vec![ let queries = vec![
BytesMut::from(&b"UPDATE items SET name = 'pumpkin' WHERE id = 5\0"[..]), simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
BytesMut::from(&b"INSERT INTO items (id, name) VALUES (5, 'pumpkin')\0"[..]), simple_query("INSERT INTO items (id, name) VALUES (5, 'pumpkin')"),
BytesMut::from(&b"DELETE FROM items WHERE id = 5\0"[..]), simple_query("DELETE FROM items WHERE id = 5"),
BytesMut::from(&b"BEGIN\0"[..]), // Transaction start simple_query("BEGIN"), // Transaction start
]; ];
for query in &queries { for query in queries {
let mut res = BytesMut::from(&b"Q"[..]);
res.put_i32(query.len() as i32 + 4);
res.put(query.clone());
// It's a recognized query // It's a recognized query
assert!(query_router.infer_role(res)); assert!(qr.infer_role(query));
assert_eq!(query_router.role(), Some(Role::Primary)); assert_eq!(qr.role(), Some(Role::Primary));
} }
} }
@@ -432,16 +345,11 @@ mod test {
let default_server_role: Option<Role> = None; let default_server_role: Option<Role> = None;
let shards = 5; let shards = 5;
let mut qr = QueryRouter::new(default_server_role, shards, true, false);
let query = simple_query("SELECT * FROM items WHERE id = 5");
let mut query_router = QueryRouter::new(default_server_role, shards, true, false); assert!(qr.infer_role(query));
assert_eq!(qr.role(), None);
let query = BytesMut::from(&b"SELECT * FROM items WHERE id = 5\0"[..]);
let mut res = BytesMut::from(&b"Q"[..]);
res.put_i32(query.len() as i32 + 4);
res.put(query.clone());
assert!(query_router.infer_role(res));
assert_eq!(query_router.role(), None);
} }
#[test] #[test]
@@ -467,26 +375,112 @@ mod test {
} }
#[test] #[test]
fn test_set_shard_explicitely() { fn test_regex_set() {
QueryRouter::setup(); QueryRouter::setup();
let default_server_role: Option<Role> = None; let tests = [
let shards = 5; // Upper case
"SET SHARDING KEY TO '1'",
"SET SHARD TO '1'",
"SHOW SHARD",
"SET SERVER ROLE TO 'replica'",
"SET SERVER ROLE TO 'primary'",
"SET SERVER ROLE TO 'any'",
"SET SERVER ROLE TO 'auto'",
"SHOW SERVER ROLE",
// Lower case
"set sharding key to '1'",
"set shard to '1'",
"show shard",
"set server role to 'replica'",
"set server role to 'primary'",
"set server role to 'any'",
"set server role to 'auto'",
"show server role",
];
let mut query_router = QueryRouter::new(default_server_role, shards, false, false); let set = CUSTOM_SQL_REGEX_SET.get().unwrap();
// Build the special syntax query. for test in &tests {
let mut message = BytesMut::new(); let matches: Vec<_> = set.matches(test).into_iter().collect();
let query = BytesMut::from(&b"SET SHARD TO '1'\0"[..]);
message.put_u8(b'Q'); // Query assert_eq!(matches.len(), 1);
message.put_i32(query.len() as i32 + 4); }
message.put_slice(&query[..]); }
assert!(query_router.select_shard(message)); #[test]
assert_eq!(query_router.shard(), 1); // See sharding.rs (we are using 5 shards on purpose in this test) fn test_try_execute_command() {
QueryRouter::setup();
let mut qr = QueryRouter::new(Some(Role::Primary), 5, false, false);
query_router.reset(); // SetShardingKey
assert_eq!(query_router.shard(), 0); let query = simple_query("SET SHARDING KEY TO '13'");
assert_eq!(
qr.try_execute_command(query),
Some((Command::SetShardingKey, String::from("3")))
);
assert_eq!(qr.shard(), 3);
// SetShard
let query = simple_query("SET SHARD TO '1'");
assert_eq!(
qr.try_execute_command(query),
Some((Command::SetShard, String::from("1")))
);
assert_eq!(qr.shard(), 1);
// ShowShard
let query = simple_query("SHOW SHARD");
assert_eq!(
qr.try_execute_command(query),
Some((Command::ShowShard, String::from("1")))
);
// SetServerRole
let roles = ["primary", "replica", "any", "auto", "primary"];
let verify_roles = [
Some(Role::Primary),
Some(Role::Replica),
None,
None,
Some(Role::Primary),
];
let query_parser_enabled = [false, false, false, true, false];
for (idx, role) in roles.iter().enumerate() {
let query = simple_query(&format!("SET SERVER ROLE TO '{}'", role));
assert_eq!(
qr.try_execute_command(query),
Some((Command::SetServerRole, String::from(*role)))
);
assert_eq!(qr.role(), verify_roles[idx],);
assert_eq!(qr.query_parser_enabled(), query_parser_enabled[idx],);
// ShowServerRole
let query = simple_query("SHOW SERVER ROLE");
assert_eq!(
qr.try_execute_command(query),
Some((Command::ShowServerRole, String::from(*role)))
);
}
}
#[test]
fn test_enable_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new(None, 5, false, false);
let query = simple_query("SET SERVER ROLE TO 'auto'");
assert!(qr.try_execute_command(query) != None);
assert!(qr.query_parser_enabled());
assert_eq!(qr.role(), None);
let query = simple_query("INSERT INTO test_table VALUES (1)");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.role(), Some(Role::Primary));
let query = simple_query("SELECT * FROM test_table");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.role(), Some(Role::Replica));
} }
} }

View File

@@ -1,7 +1,8 @@
require "active_record" require "active_record"
ActiveRecord.verbose_query_logs = true # Uncomment these two to see all queries.
ActiveRecord::Base.logger = Logger.new(STDOUT) # ActiveRecord.verbose_query_logs = true
# ActiveRecord::Base.logger = Logger.new(STDOUT)
ActiveRecord::Base.establish_connection( ActiveRecord::Base.establish_connection(
adapter: "postgresql", adapter: "postgresql",
@@ -18,6 +19,13 @@ class TestTable < ActiveRecord::Base
self.table_name = "test_table" self.table_name = "test_table"
end end
class TestSafeTable < ActiveRecord::Base
self.table_name = "test_safe_table"
end
class ShouldNeverHappenException < Exception
end
# # Create the table. # # Create the table.
class CreateTestTable < ActiveRecord::Migration[7.0] class CreateTestTable < ActiveRecord::Migration[7.0]
# Disable transasctions or things will fly out of order! # Disable transasctions or things will fly out of order!
@@ -30,6 +38,7 @@ class CreateTestTable < ActiveRecord::Migration[7.0]
# This will make this migration reversible! # This will make this migration reversible!
reversible do reversible do
connection.execute "SET SHARD TO '#{x.to_i}'" connection.execute "SET SHARD TO '#{x.to_i}'"
connection.execute "SET SERVER ROLE TO 'primary'"
end end
# Always wrap the entire migration inside a transaction. If that's not possible, # Always wrap the entire migration inside a transaction. If that's not possible,
@@ -47,28 +56,82 @@ class CreateTestTable < ActiveRecord::Migration[7.0]
end end
end end
begin class CreateSafeShardedTable < ActiveRecord::Migration[7.0]
CreateTestTable.migrate(:down) # Disable transasctions or things will fly out of order!
rescue Exception disable_ddl_transaction!
puts "Tables don't exist yet"
end
CreateTestTable.migrate(:up) SHARDS = 3
10.times do |x| def up
x += 1 # Postgres ids start at 1 SHARDS.times do |x|
r = TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" # This will make this migration reversible!
connection.execute "SET SHARD TO '#{x.to_i}'"
connection.execute "SET SERVER ROLE TO 'primary'"
# Always wrap writes inside explicit transactions like these because ActiveRecord may fetch table info connection.execute <<-SQL
# before actually issuing the `INSERT` statement. This ensures that that happens inside a transaction CREATE TABLE test_safe_table (
# and the write goes to the correct shard. id BIGINT PRIMARY KEY,
TestTable.connection.transaction do name VARCHAR,
TestTable.create(id: x, name: "something_special_#{x.to_i}", description: "It's a surprise!") description TEXT
) PARTITION BY HASH (id);
CREATE TABLE test_safe_table_data PARTITION OF test_safe_table
FOR VALUES WITH (MODULUS #{SHARDS.to_i}, REMAINDER #{x.to_i});
SQL
end
end
def down
SHARDS.times do |x|
connection.execute "SET SHARD TO '#{x.to_i}'"
connection.execute "SET SERVER ROLE TO 'primary'"
connection.execute "DROP TABLE test_safe_table CASCADE"
end
end end
end end
10.times do |x| 20.times do
x += 1 # 0 confuses our sharding function begin
TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" CreateTestTable.migrate(:down)
puts TestTable.find_by_id(x).id rescue Exception
puts "Tables don't exist yet"
end
begin
CreateSafeShardedTable.migrate(:down)
rescue Exception
puts "Tables don't exist yet"
end
CreateTestTable.migrate(:up)
CreateSafeShardedTable.migrate(:up)
3.times do |x|
TestSafeTable.connection.execute "SET SHARD TO '#{x.to_i}'"
TestSafeTable.connection.execute "SET SERVER ROLE TO 'primary'"
TestSafeTable.connection.execute "TRUNCATE #{TestTable.table_name}"
end
10.times do |x|
x += 1 # Postgres ids start at 1
TestSafeTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'"
TestSafeTable.connection.execute "SET SERVER ROLE TO 'primary'"
TestSafeTable.create(id: x, name: "something_special_#{x.to_i}", description: "It's a surprise!")
end
10.times do |x|
x += 1 # 0 confuses our sharding function
TestSafeTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'"
TestSafeTable.connection.execute "SET SERVER ROLE TO 'replica'"
TestSafeTable.find_by_id(x).id
end
end
# Test wrong shard
TestSafeTable.connection.execute "SET SHARD TO '1'"
begin
TestSafeTable.create(id: 5, name: "test", description: "test description")
raise ShouldNeverHappenException("Uh oh")
rescue ActiveRecord::StatementInvalid
puts "OK"
end end