From bbacb9cf01b25c70e6cabb73f815f2e2b677ca02 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 18 Feb 2022 09:43:07 -0800 Subject: [PATCH] Explicit shard selection; Rails tests (#24) * Explicit shard selection; Rails tests * try running ruby tests * try without lockfile * aha * ok --- .circleci/config.yml | 2 +- .circleci/run_tests.sh | 8 ++++ src/query_router.rs | 60 +++++++++++++++++++++++++++--- tests/ruby/.ruby-version | 1 + tests/ruby/Gemfile | 4 ++ tests/ruby/Gemfile.lock | 30 +++++++++++++++ tests/ruby/tests.rb | 79 ++++++++++++++++++++++++++++++++++++---- 7 files changed, 170 insertions(+), 14 deletions(-) create mode 100644 tests/ruby/.ruby-version create mode 100644 tests/ruby/Gemfile create mode 100644 tests/ruby/Gemfile.lock diff --git a/.circleci/config.yml b/.circleci/config.yml index 5feb5c5..f6180ad 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,7 +25,7 @@ jobs: key: cargo-lock-2-{{ checksum "Cargo.lock" }} - run: name: "Install dependencies" - command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12" + command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12 ruby ruby-dev libpq-dev" - run: name: "Build" command: "cargo build" diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 2ff3008..8a73102 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -34,6 +34,14 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > / # Replica/primary selection & more sharding tests psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null +# +# ActiveRecord tests! +# +cd tests/ruby +sudo gem install bundler +bundle install +ruby tests.rb + # Attempt clean shut down killall pgcat -s SIGINT diff --git a/src/query_router.rs b/src/query_router.rs index f5bdc99..c02e6a6 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -10,11 +10,13 @@ use sqlparser::parser::Parser; use crate::config::Role; use crate::sharding::Sharder; -const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';"; -const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)';"; +const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+'"; +const SET_SHARD_REGEX: &str = r"SET SHARD TO '[0-9]+'"; +const ROLE_REGEX: &str = r"SET SERVER ROLE TO '(PRIMARY|REPLICA)'"; static SHARDING_REGEX_RE: OnceCell = OnceCell::new(); static ROLE_REGEX_RE: OnceCell = OnceCell::new(); +static SET_SHARD_REGEX_RE: OnceCell = OnceCell::new(); pub struct QueryRouter { // By default, queries go here, unless we have better information @@ -60,7 +62,17 @@ impl QueryRouter { Err(_) => false, }; - a && b + let c = match SET_SHARD_REGEX_RE.set( + RegexBuilder::new(SET_SHARD_REGEX) + .case_insensitive(true) + .build() + .unwrap(), + ) { + Ok(_) => true, + Err(_) => false, + }; + + a && b && c } pub fn new( @@ -99,12 +111,17 @@ impl QueryRouter { let len = buf.get_i32(); let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]); // Don't read the ternminating null - let rgx = match SHARDING_REGEX_RE.get() { + let sharding_key_rgx = match SHARDING_REGEX_RE.get() { Some(r) => r, None => return false, }; - if rgx.is_match(&query) { + let set_shard_rgx = match SET_SHARD_REGEX_RE.get() { + Some(r) => r, + None => return false, + }; + + if sharding_key_rgx.is_match(&query) { let shard = query.split("'").collect::>()[1]; match shard.parse::() { @@ -120,6 +137,15 @@ impl QueryRouter { // case anyway. Err(_) => false, } + } else if set_shard_rgx.is_match(&query) { + let shard = query.split("'").collect::>()[1]; + match shard.parse::() { + Ok(shard) => { + self.active_shard = Some(shard); + true + } + Err(_) => false, + } } else { false } @@ -439,4 +465,28 @@ mod test { assert!(query_router.infer_role(res)); assert_eq!(query_router.role(), Some(Role::Replica)); } + + #[test] + fn test_set_shard_explicitely() { + QueryRouter::setup(); + + let default_server_role: Option = None; + let shards = 5; + + let mut query_router = QueryRouter::new(default_server_role, shards, false, false); + + // Build the special syntax query. + let mut message = BytesMut::new(); + let query = BytesMut::from(&b"SET SHARD TO '1'\0"[..]); + + message.put_u8(b'Q'); // Query + message.put_i32(query.len() as i32 + 4); + message.put_slice(&query[..]); + + assert!(query_router.select_shard(message)); + assert_eq!(query_router.shard(), 1); // See sharding.rs (we are using 5 shards on purpose in this test) + + query_router.reset(); + assert_eq!(query_router.shard(), 0); + } } diff --git a/tests/ruby/.ruby-version b/tests/ruby/.ruby-version new file mode 100644 index 0000000..860487c --- /dev/null +++ b/tests/ruby/.ruby-version @@ -0,0 +1 @@ +2.7.1 diff --git a/tests/ruby/Gemfile b/tests/ruby/Gemfile new file mode 100644 index 0000000..94fb8c3 --- /dev/null +++ b/tests/ruby/Gemfile @@ -0,0 +1,4 @@ +source "https://rubygems.org" + +gem "pg" +gem "activerecord" diff --git a/tests/ruby/Gemfile.lock b/tests/ruby/Gemfile.lock new file mode 100644 index 0000000..7b7d448 --- /dev/null +++ b/tests/ruby/Gemfile.lock @@ -0,0 +1,30 @@ +GEM + remote: https://rubygems.org/ + specs: + activemodel (7.0.2.2) + activesupport (= 7.0.2.2) + activerecord (7.0.2.2) + activemodel (= 7.0.2.2) + activesupport (= 7.0.2.2) + activesupport (7.0.2.2) + concurrent-ruby (~> 1.0, >= 1.0.2) + i18n (>= 1.6, < 2) + minitest (>= 5.1) + tzinfo (~> 2.0) + concurrent-ruby (1.1.9) + i18n (1.10.0) + concurrent-ruby (~> 1.0) + minitest (5.15.0) + pg (1.3.2) + tzinfo (2.0.4) + concurrent-ruby (~> 1.0) + +PLATFORMS + x86_64-linux + +DEPENDENCIES + activerecord + pg + +BUNDLED WITH + 2.3.7 diff --git a/tests/ruby/tests.rb b/tests/ruby/tests.rb index f12440f..18d1bfb 100644 --- a/tests/ruby/tests.rb +++ b/tests/ruby/tests.rb @@ -1,11 +1,74 @@ -require 'pg' +require "active_record" -conn = PG.connect(host: '127.0.0.1', port: 5433, dbname: 'test') +ActiveRecord.verbose_query_logs = true +ActiveRecord::Base.logger = Logger.new(STDOUT) -conn.exec( "SELECT * FROM pg_stat_activity" ) do |result| - puts " PID | User | Query" - result.each do |row| - puts " %7d | %-16s | %s " % - row.values_at('pid', 'usename', 'query') +ActiveRecord::Base.establish_connection( + adapter: "postgresql", + host: "127.0.0.1", + port: 6432, + username: "sharding_user", + password: "sharding_user", + database: "rails_dev", + prepared_statements: false, # Transaction mode + advisory_locks: false, # Same +) + +class TestTable < ActiveRecord::Base + self.table_name = "test_table" +end + +# # Create the table. +class CreateTestTable < ActiveRecord::Migration[7.0] + # Disable transasctions or things will fly out of order! + disable_ddl_transaction! + + SHARDS = 3 + + def change + SHARDS.times do |x| + # This will make this migration reversible! + reversible do + connection.execute "SET SHARD TO '#{x.to_i}'" + end + + # Always wrap the entire migration inside a transaction. If that's not possible, + # execute a `SET SHARD` command before every statement and make sure AR doesn't need + # to load database information beforehand (i.e. it's not the first query in the migration). + connection.transaction do + create_table :test_table, if_not_exists: true do |t| + t.string :name + t.string :description + + t.timestamps + end + end + end end -end \ No newline at end of file +end + +begin + CreateTestTable.migrate(:down) +rescue Exception + puts "Tables don't exist yet" +end + +CreateTestTable.migrate(:up) + +10.times do |x| + x += 1 # Postgres ids start at 1 + r = TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" + + # Always wrap writes inside explicit transactions like these because ActiveRecord may fetch table info + # before actually issuing the `INSERT` statement. This ensures that that happens inside a transaction + # and the write goes to the correct shard. + TestTable.connection.transaction do + TestTable.create(id: x, name: "something_special_#{x.to_i}", description: "It's a surprise!") + end +end + +10.times do |x| + x += 1 # 0 confuses our sharding function + TestTable.connection.execute "SET SHARDING KEY TO '#{x.to_i}'" + puts TestTable.find_by_id(x).id +end