Avoid sending Z packet in the middle of extended protocol packet sequence if we fail to get connection from pool (#137)

* Failing test

* maybe

* try fail

* try

* add message

* pool size

* correct user

* more

* debug

* try fix

* see stdout

* stick?

* fix configs

* modify

* types

* m

* maybe

* make tests idempotent

* hopefully fails

* Add client fix

* revert pgcat.toml change

* Fix tests
This commit is contained in:
Mostafa Abdelraouf
2022-08-23 13:02:23 -05:00
committed by GitHub
parent 5a0cea6a24
commit c054ff068d
3 changed files with 106 additions and 30 deletions

View File

@@ -19,8 +19,8 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Install Toxiproxy to simulate a downed/slow database
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
sudo dpkg -i toxiproxy-2.1.4.deb
wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb
sudo dpkg -i toxiproxy-2.4.0.deb
# Start Toxiproxy
toxiproxy-server &
@@ -129,11 +129,14 @@ toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
start_pgcat "info"
# Test session mode (and config reload)
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml
sed -i '0,/simple_db/s/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml
# Reload config test
kill -SIGHUP $(pgrep pgcat)
# Revert settings after reload. Makes test runs idempotent
sed -i '0,/simple_db/s/pool_mode = "session"/pool_mode = "transaction"/' .circleci/pgcat.toml
sleep 1
# Prepared statements that will only work in session mode

View File

@@ -643,9 +643,20 @@ where
conn
}
Err(err) => {
error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool")
.await?;
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
// of extended protocol submission. So we will hold off on sending the actual error
// message to the client until we get 'S' message
match message[0] as char {
'P' | 'B' | 'E' | 'D' => (),
_ => {
error!("Could not get connection from pool: {:?}", err);
error_response(
&mut self.write,
"could not get connection from the pool",
)
.await?;
}
}
continue;
}
};

View File

@@ -5,6 +5,89 @@ require 'pg'
require 'toml'
$stdout.sync = true
$stderr.sync = true
class ConfigEditor
def initialize
@original_config_text = File.read('../../.circleci/pgcat.toml')
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
@original_configs = TOML.load(text_to_load)
end
def original_configs
TOML.load(TOML::Generator.new(@original_configs).body)
end
def with_modified_configs(new_configs)
text_to_write = TOML::Generator.new(new_configs).body
text_to_write = text_to_write.gsub("\"5432\"", "5432")
File.write('../../.circleci/pgcat.toml', text_to_write)
yield
ensure
File.write('../../.circleci/pgcat.toml', @original_config_text)
end
end
def with_captured_stdout_stderr
sout = STDOUT.clone
serr = STDERR.clone
STDOUT.reopen("/tmp/out.txt", "w+")
STDERR.reopen("/tmp/err.txt", "w+")
STDOUT.sync = true
STDERR.sync = true
yield
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
ensure
STDOUT.reopen(sout)
STDERR.reopen(serr)
end
def test_extended_protocol_pooler_errors
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
conf_editor = ConfigEditor.new
new_configs = conf_editor.original_configs
# shorter timeouts
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
10.times do
Thread.new do
conn = PG::connect(conn_str)
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
ensure
conn&.close
end
end
sleep(0.5)
conn_under_test = PG::connect(conn_str)
stdout, stderr = with_captured_stdout_stderr do
5.times do |i|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
sleep 1
end
end
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
puts "Pool checkout errors not breaking clients passed"
ensure
sleep 1
admin_conn.async_exec("RELOAD") # Reset state
conn_under_test&.close
end
test_extended_protocol_pooler_errors
# Uncomment these two to see all queries.
# ActiveRecord.verbose_query_logs = true
@@ -144,30 +227,6 @@ def test_server_parameters
end
class ConfigEditor
def initialize
@original_config_text = File.read('../../.circleci/pgcat.toml')
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
@original_configs = TOML.load(text_to_load)
end
def original_configs
TOML.load(TOML::Generator.new(@original_configs).body)
end
def with_modified_configs(new_configs)
text_to_write = TOML::Generator.new(new_configs).body
text_to_write = text_to_write.gsub("\"5432\"", "5432")
File.write('../../.circleci/pgcat.toml', text_to_write)
yield
ensure
File.write('../../.circleci/pgcat.toml', @original_config_text)
end
end
def test_reload_pool_recycling
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
@@ -201,3 +260,6 @@ ensure
end
test_reload_pool_recycling