From c054ff068d99dbce2718dd0a2c4424fe11d1837a Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Tue, 23 Aug 2022 13:02:23 -0500 Subject: [PATCH] Avoid sending `Z` packet in the middle of extended protocol packet sequence if we fail to get connection from pool (#137) * Failing test * maybe * try fail * try * add message * pool size * correct user * more * debug * try fix * see stdout * stick? * fix configs * modify * types * m * maybe * make tests idempotent * hopefully fails * Add client fix * revert pgcat.toml change * Fix tests --- .circleci/run_tests.sh | 9 ++-- src/client.rs | 17 +++++-- tests/ruby/tests.rb | 110 ++++++++++++++++++++++++++++++++--------- 3 files changed, 106 insertions(+), 30 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 645ff94..4202eb5 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -19,8 +19,8 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i # Install Toxiproxy to simulate a downed/slow database -wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb -sudo dpkg -i toxiproxy-2.1.4.deb +wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb +sudo dpkg -i toxiproxy-2.4.0.deb # Start Toxiproxy toxiproxy-server & @@ -129,11 +129,14 @@ toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica start_pgcat "info" # Test session mode (and config reload) -sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml +sed -i '0,/simple_db/s/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml # Reload config test kill -SIGHUP $(pgrep pgcat) +# Revert settings after reload. Makes test runs idempotent +sed -i '0,/simple_db/s/pool_mode = "session"/pool_mode = "transaction"/' .circleci/pgcat.toml + sleep 1 # Prepared statements that will only work in session mode diff --git a/src/client.rs b/src/client.rs index c4866a0..0612c27 100644 --- a/src/client.rs +++ b/src/client.rs @@ -643,9 +643,20 @@ where conn } Err(err) => { - error!("Could not get connection from pool: {:?}", err); - error_response(&mut self.write, "could not get connection from the pool") - .await?; + // Clients do not expect to get SystemError followed by ReadyForQuery in the middle + // of extended protocol submission. So we will hold off on sending the actual error + // message to the client until we get 'S' message + match message[0] as char { + 'P' | 'B' | 'E' | 'D' => (), + _ => { + error!("Could not get connection from pool: {:?}", err); + error_response( + &mut self.write, + "could not get connection from the pool", + ) + .await?; + } + } continue; } }; diff --git a/tests/ruby/tests.rb b/tests/ruby/tests.rb index ba9476f..d7b928d 100644 --- a/tests/ruby/tests.rb +++ b/tests/ruby/tests.rb @@ -5,6 +5,89 @@ require 'pg' require 'toml' $stdout.sync = true +$stderr.sync = true + +class ConfigEditor + def initialize + @original_config_text = File.read('../../.circleci/pgcat.toml') + text_to_load = @original_config_text.gsub("5432", "\"5432\"") + + @original_configs = TOML.load(text_to_load) + end + + def original_configs + TOML.load(TOML::Generator.new(@original_configs).body) + end + + def with_modified_configs(new_configs) + text_to_write = TOML::Generator.new(new_configs).body + text_to_write = text_to_write.gsub("\"5432\"", "5432") + File.write('../../.circleci/pgcat.toml', text_to_write) + yield + ensure + File.write('../../.circleci/pgcat.toml', @original_config_text) + end +end + +def with_captured_stdout_stderr + sout = STDOUT.clone + serr = STDERR.clone + STDOUT.reopen("/tmp/out.txt", "w+") + STDERR.reopen("/tmp/err.txt", "w+") + STDOUT.sync = true + STDERR.sync = true + yield + return File.read('/tmp/out.txt'), File.read('/tmp/err.txt') +ensure + STDOUT.reopen(sout) + STDERR.reopen(serr) +end + + +def test_extended_protocol_pooler_errors + admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat") + + conf_editor = ConfigEditor.new + new_configs = conf_editor.original_configs + + # shorter timeouts + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1 + + conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") } + + conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db" + 10.times do + Thread.new do + conn = PG::connect(conn_str) + conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError + ensure + conn&.close + end + end + + sleep(0.5) + conn_under_test = PG::connect(conn_str) + stdout, stderr = with_captured_stdout_stderr do + 5.times do |i| + conn_under_test.async_exec("SELECT 1") rescue PG::SystemError + conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError + sleep 1 + end + end + + raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle") + puts "Pool checkout errors not breaking clients passed" +ensure + sleep 1 + admin_conn.async_exec("RELOAD") # Reset state + conn_under_test&.close +end + +test_extended_protocol_pooler_errors # Uncomment these two to see all queries. # ActiveRecord.verbose_query_logs = true @@ -144,30 +227,6 @@ def test_server_parameters end -class ConfigEditor - def initialize - @original_config_text = File.read('../../.circleci/pgcat.toml') - text_to_load = @original_config_text.gsub("5432", "\"5432\"") - - @original_configs = TOML.load(text_to_load) - end - - def original_configs - TOML.load(TOML::Generator.new(@original_configs).body) - end - - def with_modified_configs(new_configs) - text_to_write = TOML::Generator.new(new_configs).body - text_to_write = text_to_write.gsub("\"5432\"", "5432") - File.write('../../.circleci/pgcat.toml', text_to_write) - yield - ensure - File.write('../../.circleci/pgcat.toml', @original_config_text) - end - -end - - def test_reload_pool_recycling admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat") server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") @@ -201,3 +260,6 @@ ensure end test_reload_pool_recycling + + +