From d48c04a7fb4b09c5c8c220975320f60a0d40d1c7 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Tue, 30 Aug 2022 11:14:53 -0500 Subject: [PATCH] Ruby integration tests (#147) * Ruby integration tests * forgot a file * refactor * refactoring * more refactoring * remove config helper * try multiple databases * fix * more databases * Use pg stats * ports * speed * Fix tests * preload library * comment --- .circleci/config.yml | 26 ++++- .circleci/run_tests.sh | 14 ++- tests/ruby/Gemfile | 6 +- tests/ruby/Gemfile.lock | 22 +++- tests/ruby/helpers/pg_instance.rb | 82 +++++++++++++ tests/ruby/helpers/pgcat_helper.rb | 100 ++++++++++++++++ tests/ruby/helpers/pgcat_process.rb | 116 +++++++++++++++++++ tests/ruby/load_balancing_spec.rb | 61 ++++++++++ tests/ruby/misc_spec.rb | 109 +++++++++++++++++ tests/ruby/routing_spec.rb | 81 +++++++++++++ tests/ruby/spec_helper.rb | 21 ++++ tests/ruby/tests.rb | 154 +------------------------ tests/sharding/query_routing_setup.sql | 12 ++ 13 files changed, 639 insertions(+), 165 deletions(-) create mode 100644 tests/ruby/helpers/pg_instance.rb create mode 100644 tests/ruby/helpers/pgcat_helper.rb create mode 100644 tests/ruby/helpers/pgcat_process.rb create mode 100644 tests/ruby/load_balancing_spec.rb create mode 100644 tests/ruby/misc_spec.rb create mode 100644 tests/ruby/routing_spec.rb create mode 100644 tests/ruby/spec_helper.rb diff --git a/.circleci/config.yml b/.circleci/config.yml index 1d3449a..98aaea7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -15,14 +15,34 @@ jobs: RUSTFLAGS: "-C instrument-coverage" LLVM_PROFILE_FILE: "pgcat-%m.profraw" - image: postgres:14 - # auth: - # username: mydockerhub-user - # password: $DOCKERHUB_PASSWORD + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_HOST_AUTH_METHOD: scram-sha-256 + - image: postgres:14 + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"] + environment: + POSTGRES_USER: postgres + POSTGRES_DB: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_HOST_AUTH_METHOD: scram-sha-256 + - image: postgres:14 + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"] + environment: + POSTGRES_USER: postgres + POSTGRES_DB: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_HOST_AUTH_METHOD: scram-sha-256 + - image: postgres:14 + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"] + environment: + POSTGRES_USER: postgres + POSTGRES_DB: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_HOST_AUTH_METHOD: scram-sha-256 + # Add steps to the job # See: https://circleci.com/docs/2.0/configuration-reference/#steps steps: diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 48303d7..1585ebd 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -16,6 +16,9 @@ function start_pgcat() { # Setup the database with shards and user PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql +PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql +PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql +PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i @@ -26,7 +29,7 @@ wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/downlo sudo dpkg -i toxiproxy-2.4.0.deb # Start Toxiproxy -toxiproxy-server & +LOG_LEVEL=error toxiproxy-server & sleep 1 # Create a database at port 5433, forward it to Postgres @@ -87,7 +90,8 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again cd tests/ruby sudo gem install bundler bundle install -ruby tests.rb +bundle exec ruby tests.rb +bundle exec rspec *_spec.rb cd ../.. # @@ -105,9 +109,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null -psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null -psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null -psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null +psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null +psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null +psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore (! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null) export PGPASSWORD=sharding_user diff --git a/tests/ruby/Gemfile b/tests/ruby/Gemfile index 7b01918..ec7cd3a 100644 --- a/tests/ruby/Gemfile +++ b/tests/ruby/Gemfile @@ -1,6 +1,8 @@ source "https://rubygems.org" gem "pg" -gem "activerecord" +gem "toml" +gem "rspec" gem "rubocop" -gem "toml", "~> 0.3.0" +gem "toxiproxy" +gem "activerecord" diff --git a/tests/ruby/Gemfile.lock b/tests/ruby/Gemfile.lock index 3fd0347..65d8bce 100644 --- a/tests/ruby/Gemfile.lock +++ b/tests/ruby/Gemfile.lock @@ -13,6 +13,7 @@ GEM tzinfo (~> 2.0) ast (2.4.2) concurrent-ruby (1.1.10) + diff-lcs (1.5.0) i18n (1.11.0) concurrent-ruby (~> 1.0) minitest (5.16.2) @@ -24,6 +25,19 @@ GEM rainbow (3.1.1) regexp_parser (2.3.1) rexml (3.2.5) + rspec (3.11.0) + rspec-core (~> 3.11.0) + rspec-expectations (~> 3.11.0) + rspec-mocks (~> 3.11.0) + rspec-core (3.11.0) + rspec-support (~> 3.11.0) + rspec-expectations (3.11.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.11.0) + rspec-mocks (3.11.1) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.11.0) + rspec-support (3.11.0) rubocop (1.29.0) parallel (~> 1.10) parser (>= 3.1.0.0) @@ -38,19 +52,23 @@ GEM ruby-progressbar (1.11.0) toml (0.3.0) parslet (>= 1.8.0, < 3.0.0) + toxiproxy (2.0.1) tzinfo (2.0.4) concurrent-ruby (~> 1.0) unicode-display_width (2.1.0) PLATFORMS + aarch64-linux arm64-darwin-21 x86_64-linux DEPENDENCIES activerecord pg + rspec rubocop - toml (~> 0.3.0) + toml + toxiproxy BUNDLED WITH - 2.3.7 + 2.3.21 diff --git a/tests/ruby/helpers/pg_instance.rb b/tests/ruby/helpers/pg_instance.rb new file mode 100644 index 0000000..3116457 --- /dev/null +++ b/tests/ruby/helpers/pg_instance.rb @@ -0,0 +1,82 @@ +require 'pg' +require 'toxiproxy' + +class PgInstance + attr_reader :port + attr_reader :username + attr_reader :password + attr_reader :database_name + + def initialize(port, username, password, database_name) + @original_port = port + @toxiproxy_port = 10000 + port.to_i + @port = @toxiproxy_port + + @username = username + @password = password + @database_name = database_name + @toxiproxy_name = "database_#{@original_port}" + Toxiproxy.populate([{ + name: @toxiproxy_name, + listen: "0.0.0.0:#{@toxiproxy_port}", + upstream: "localhost:#{@original_port}", + }]) + + # Toxiproxy server will outlive our PgInstance objects + # so we want to destroy our proxies before exiting + # Ruby finalizer is ideal for doing this + ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy }) + end + + def with_connection + conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}") + yield conn + ensure + conn&.close + end + + def reset + reset_toxics + reset_stats + end + + def toxiproxy + Toxiproxy[@toxiproxy_name] + end + + def take_down + if block_given? + Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield } + else + Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save) + end + end + + def add_latency(latency) + if block_given? + Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield } + else + Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save) + end + end + + def delete_proxy + Toxiproxy[@toxiproxy_name].delete + end + + def reset_toxics + Toxiproxy[@toxiproxy_name].toxics.each(&:destroy) + end + + def reset_stats + with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") } + end + + def count_query(query) + with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i } + end + + def count_select_1_plus_2 + with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i } + end +end diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb new file mode 100644 index 0000000..30b2bc8 --- /dev/null +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -0,0 +1,100 @@ +require 'json' +require 'ostruct' +require_relative 'pgcat_process' +require_relative 'pg_instance' + +module Helpers + module Pgcat + def self.three_shard_setup(pool_name, pool_size) + user = { + "password" => "sharding_user", + "pool_size" => pool_size, + "statement_timeout" => 0, + "username" => "sharding_user" + } + + pgcat = PgcatProcess.new("info") + primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0") + primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1") + primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2") + + pgcat_cfg = pgcat.current_config + pgcat_cfg["pools"] = { + "#{pool_name}" => { + "default_role" => "any", + "pool_mode" => "transaction", + "primary_reads_enabled" => false, + "query_parser_enabled" => false, + "sharding_function" => "pg_bigint_hash", + "shards" => { + "0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] }, + "1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] }, + "2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] }, + }, + "users" => { "0" => user } + } + } + pgcat.update_config(pgcat_cfg) + + pgcat.start + pgcat.wait_until_ready + + OpenStruct.new.tap do |struct| + struct.pgcat = pgcat + struct.shards = [primary0, primary1, primary2] + struct.all_databases = [primary0, primary1, primary2] + end + end + + def self.single_shard_setup(pool_name, pool_size) + user = { + "password" => "sharding_user", + "pool_size" => pool_size, + "statement_timeout" => 0, + "username" => "sharding_user" + } + + pgcat = PgcatProcess.new("info") + pgcat_cfg = pgcat.current_config + + primary = PgInstance.new(5432, user["username"], user["password"], "shard0") + replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0") + replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0") + replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0") + + # Main proxy configs + pgcat_cfg["pools"] = { + "#{pool_name}" => { + "default_role" => "any", + "pool_mode" => "transaction", + "primary_reads_enabled" => false, + "query_parser_enabled" => false, + "sharding_function" => "pg_bigint_hash", + "shards" => { + "0" => { + "database" => "shard0", + "servers" => [ + ["localhost", primary.port.to_s, "primary"], + ["localhost", replica0.port.to_s, "replica"], + ["localhost", replica1.port.to_s, "replica"], + ["localhost", replica2.port.to_s, "replica"] + ] + }, + }, + "users" => { "0" => user } + } + } + pgcat_cfg["general"]["port"] = pgcat.port + pgcat.update_config(pgcat_cfg) + pgcat.start + pgcat.wait_until_ready + + OpenStruct.new.tap do |struct| + struct.pgcat = pgcat + struct.primary = primary + struct.replicas = [replica0, replica1, replica2] + struct.all_databases = [primary, replica0, replica1, replica2] + end + end + end +end diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb new file mode 100644 index 0000000..a5a6d3d --- /dev/null +++ b/tests/ruby/helpers/pgcat_process.rb @@ -0,0 +1,116 @@ +require 'pg' +require 'toml' +require 'fileutils' +require 'securerandom' + +class PgcatProcess + attr_reader :port + attr_reader :pid + + def self.finalize(pid, log_filename, config_filename) + `kill #{pid}` + File.delete(config_filename) if File.exist?(config_filename) + File.delete(log_filename) if File.exist?(log_filename) + end + + def initialize(log_level) + @env = {"RUST_LOG" => log_level} + @port = rand(20000..32760) + @log_level = log_level + @log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log" + @config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml" + + @command = "../../target/debug/pgcat #{@config_filename}" + + FileUtils.cp("../../pgcat.toml", @config_filename) + cfg = current_config + cfg["general"]["port"] = @port.to_i + cfg["general"]["enable_prometheus_exporter"] = false + + update_config(cfg) + end + + def logs + File.read(@log_filename) + end + + def update_config(config_hash) + @original_config = current_config + output_to_write = TOML::Generator.new(config_hash).body + output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,') + File.write(@config_filename, output_to_write) + end + + def current_config + old_cfg = File.read(@config_filename) + loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",') + TOML.load(loadable_string) + end + + def reload_config + `kill -s HUP #{@pid}` + sleep 0.1 + end + + def start + raise StandardError, "Process is already started" unless @pid.nil? + @pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename) + ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) }) + + return self + end + + def wait_until_ready + exc = nil + 10.times do + PG::connect(example_connection_string).close + + return self + rescue => e + exc = e + sleep(0.5) + end + puts exc + raise StandardError, "Process #{@pid} never became ready. Logs #{logs}" + end + + def stop + `kill #{@pid}` + sleep 0.1 + end + + def shutdown + stop + File.delete(@config_filename) if File.exist?(@config_filename) + File.delete(@log_filename) if File.exist?(@log_filename) + end + + def admin_connection_string + cfg = current_config + username = cfg["general"]["admin_username"] + password = cfg["general"]["admin_password"] + + "postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat" + end + + def connection_string(pool_name, username) + cfg = current_config + + user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username } + password = user_obj["password"] + + "postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}" + end + + def example_connection_string + cfg = current_config + first_pool_name = cfg["pools"].keys[0] + + db_name = first_pool_name + + username = cfg["pools"][first_pool_name]["users"]["0"]["username"] + password = cfg["pools"][first_pool_name]["users"]["0"]["password"] + + "postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}" + end +end diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb new file mode 100644 index 0000000..bd98a83 --- /dev/null +++ b/tests/ruby/load_balancing_spec.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true +require_relative 'spec_helper' + +describe "Load Balancing" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + context "under regular circumstances" do + it "balances query volume between all instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + + query_count = QUERY_COUNT + expected_share = query_count / processes.all_databases.count + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + + context "when some replicas are down" do + it "balances query volume between working instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + expected_share = QUERY_COUNT / (processes.all_databases.count - 2) + failed_count = 0 + + processes[:replicas][0].take_down do + processes[:replicas][1].take_down do + QUERY_COUNT.times do + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + end + end + + expect(failed_count).to eq(2) + processes.all_databases.each do |instance| + queries_routed = instance.count_select_1_plus_2 + if processes.replicas[0..1].include?(instance) + expect(queries_routed).to eq(0) + else + expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + end +end + diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb new file mode 100644 index 0000000..9aee49a --- /dev/null +++ b/tests/ruby/misc_spec.rb @@ -0,0 +1,109 @@ +# frozen_string_literal: true +require_relative 'spec_helper' + +describe "Miscellaneous" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + describe "Extended Protocol handling" do + it "does not send packets that client does not expect during extended protocol sequence" do + new_configs = processes.pgcat.current_config + + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + 25.times do + Thread.new do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError + ensure + conn&.close + end + end + + sleep(0.5) + conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + stdout, stderr = with_captured_stdout_stderr do + 15.times do |i| + conn_under_test.async_exec("SELECT 1") rescue PG::SystemError + conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError + sleep 1 + end + end + + raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle") + end + end + + describe "Pool recycling after config reload" do + let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) } + + it "should update pools for new clients and clients that are no longer in transaction" do + server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + server_conn.async_exec("BEGIN") + + # No config change yet, client should set old configs + current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"] + expect(current_datebase_from_pg).to eq('shard0') + + # Swap shards + new_config = processes.pgcat.current_config + shard0 = new_config["pools"]["sharded_db"]["shards"]["0"] + shard1 = new_config["pools"]["sharded_db"]["shards"]["1"] + new_config["pools"]["sharded_db"]["shards"]["0"] = shard1 + new_config["pools"]["sharded_db"]["shards"]["1"] = shard0 + + # Reload config + processes.pgcat.update_config(new_config) + processes.pgcat.reload_config + sleep 0.5 + + # Config changed but transaction is in progress, client should set old configs + current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"] + expect(current_datebase_from_pg).to eq('shard0') + server_conn.async_exec("COMMIT") + + # Transaction finished, client should get new configs + current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"] + expect(current_datebase_from_pg).to eq('shard1') + + # New connection should get new configs + server_conn.close() + server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"] + expect(current_datebase_from_pg).to eq('shard1') + end + end + + describe "Clients closing connection in the middle of transaction" do + it "sends a rollback to the server" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("BEGIN") + conn.close + + expect(processes.primary.count_query("ROLLBACK")).to eq(1) + expect(processes.primary.count_query("DISCARD ALL")).to eq(1) + end + end + + describe "Server version reporting" do + it "reports correct version for normal and admin databases" do + server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + expect(server_conn.server_version).not_to eq(0) + server_conn.close + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + expect(admin_conn.server_version).not_to eq(0) + admin_conn.close + end + end +end diff --git a/tests/ruby/routing_spec.rb b/tests/ruby/routing_spec.rb new file mode 100644 index 0000000..24ea137 --- /dev/null +++ b/tests/ruby/routing_spec.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true +require_relative 'spec_helper' + + +describe "Routing" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + describe "SET ROLE" do + context "primary" do + it "routes queries only to primary" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + + query_count = 30 + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + processes.replicas.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to eq(0) + end + + expect(processes.primary.count_select_1_plus_2).to eq(query_count) + end + end + context "replica" do + it "routes queries only to replicas" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'replica'") + + expected_share = QUERY_COUNT / processes.replicas.count + failed_count = 0 + + QUERY_COUNT.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + + processes.replicas.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + + expect(processes.primary.count_select_1_plus_2).to eq(0) + end + end + + context "any" do + it "routes queries to all instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'any'") + + expected_share = QUERY_COUNT / processes.all_databases.count + failed_count = 0 + + QUERY_COUNT.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + + processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + end +end diff --git a/tests/ruby/spec_helper.rb b/tests/ruby/spec_helper.rb new file mode 100644 index 0000000..3050e18 --- /dev/null +++ b/tests/ruby/spec_helper.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require 'pg' +require_relative 'helpers/pgcat_helper' + +QUERY_COUNT = 300 +MARGIN_OF_ERROR = 0.30 + +def with_captured_stdout_stderr + sout = STDOUT.clone + serr = STDERR.clone + STDOUT.reopen("/tmp/out.txt", "w+") + STDERR.reopen("/tmp/err.txt", "w+") + STDOUT.sync = true + STDERR.sync = true + yield + return File.read('/tmp/out.txt'), File.read('/tmp/err.txt') +ensure + STDOUT.reopen(sout) + STDERR.reopen(serr) +end diff --git a/tests/ruby/tests.rb b/tests/ruby/tests.rb index d7b928d..1c18e15 100644 --- a/tests/ruby/tests.rb +++ b/tests/ruby/tests.rb @@ -1,93 +1,6 @@ # frozen_string_literal: true - -require 'active_record' require 'pg' -require 'toml' - -$stdout.sync = true -$stderr.sync = true - -class ConfigEditor - def initialize - @original_config_text = File.read('../../.circleci/pgcat.toml') - text_to_load = @original_config_text.gsub("5432", "\"5432\"") - - @original_configs = TOML.load(text_to_load) - end - - def original_configs - TOML.load(TOML::Generator.new(@original_configs).body) - end - - def with_modified_configs(new_configs) - text_to_write = TOML::Generator.new(new_configs).body - text_to_write = text_to_write.gsub("\"5432\"", "5432") - File.write('../../.circleci/pgcat.toml', text_to_write) - yield - ensure - File.write('../../.circleci/pgcat.toml', @original_config_text) - end -end - -def with_captured_stdout_stderr - sout = STDOUT.clone - serr = STDERR.clone - STDOUT.reopen("/tmp/out.txt", "w+") - STDERR.reopen("/tmp/err.txt", "w+") - STDOUT.sync = true - STDERR.sync = true - yield - return File.read('/tmp/out.txt'), File.read('/tmp/err.txt') -ensure - STDOUT.reopen(sout) - STDERR.reopen(serr) -end - - -def test_extended_protocol_pooler_errors - admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat") - - conf_editor = ConfigEditor.new - new_configs = conf_editor.original_configs - - # shorter timeouts - new_configs["general"]["connect_timeout"] = 500 - new_configs["general"]["ban_time"] = 1 - new_configs["general"]["shutdown_timeout"] = 1 - new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 - new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1 - - conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") } - - conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db" - 10.times do - Thread.new do - conn = PG::connect(conn_str) - conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError - ensure - conn&.close - end - end - - sleep(0.5) - conn_under_test = PG::connect(conn_str) - stdout, stderr = with_captured_stdout_stderr do - 5.times do |i| - conn_under_test.async_exec("SELECT 1") rescue PG::SystemError - conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError - sleep 1 - end - end - - raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle") - puts "Pool checkout errors not breaking clients passed" -ensure - sleep 1 - admin_conn.async_exec("RELOAD") # Reset state - conn_under_test&.close -end - -test_extended_protocol_pooler_errors +require 'active_record' # Uncomment these two to see all queries. # ActiveRecord.verbose_query_logs = true @@ -198,68 +111,3 @@ begin rescue ActiveRecord::StatementInvalid puts 'OK' end - -# Test evil clients -def poorly_behaved_client - conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") - conn.async_exec 'BEGIN' - conn.async_exec 'SELECT 1' - - conn.close - puts 'Bad client ok' -end - -25.times do - poorly_behaved_client -end - - -def test_server_parameters - server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") - raise StandardError, "Bad server version" if server_conn.server_version == 0 - server_conn.close - - admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat") - raise StandardError, "Bad server version" if admin_conn.server_version == 0 - admin_conn.close - - puts 'Server parameters ok' -end - - -def test_reload_pool_recycling - admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat") - server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") - - server_conn.async_exec("BEGIN") - conf_editor = ConfigEditor.new - new_configs = conf_editor.original_configs - - # swap shards - new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1" - new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0" - - raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0' - conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") } - raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0' - server_conn.async_exec("COMMIT;") - - # Transaction finished, client should get new configs - raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1' - server_conn.close() - - # New connection should get new configs - server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") - raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1' - -ensure - admin_conn.async_exec("RELOAD") # Go back to old state - admin_conn.close - server_conn.close - puts "Pool Recycling okay!" -end - -test_reload_pool_recycling - - - diff --git a/tests/sharding/query_routing_setup.sql b/tests/sharding/query_routing_setup.sql index 384b234..c25ac18 100644 --- a/tests/sharding/query_routing_setup.sql +++ b/tests/sharding/query_routing_setup.sql @@ -70,23 +70,35 @@ GRANT CONNECT ON DATABASE shard2 TO other_user; GRANT CONNECT ON DATABASE some_db TO simple_user; \c shard0 +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user; GRANT ALL ON SCHEMA public TO sharding_user; GRANT ALL ON TABLE data TO sharding_user; GRANT ALL ON SCHEMA public TO other_user; GRANT ALL ON TABLE data TO other_user; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user; \c shard1 +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user; GRANT ALL ON SCHEMA public TO sharding_user; GRANT ALL ON TABLE data TO sharding_user; GRANT ALL ON SCHEMA public TO other_user; GRANT ALL ON TABLE data TO other_user; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user; + \c shard2 +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user; GRANT ALL ON SCHEMA public TO sharding_user; GRANT ALL ON TABLE data TO sharding_user; GRANT ALL ON SCHEMA public TO other_user; GRANT ALL ON TABLE data TO other_user; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user; \c some_db +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user; GRANT ALL ON SCHEMA public TO simple_user; GRANT ALL ON TABLE data TO simple_user;