mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
We identified a bug where RELOAD fails to update the pools. To reproduce you need to start at some config state, modify that state a bit, reload, revert the configs back to the original state, and reload. The last reload will fail to update the pool because PgCat "thinks" the pool state didn't change. This is because we use a HashSet to keep track of config hashes but we never remove values from it. Say we start with State A, we modify pool configs to State B and reload. Now the POOL_HASHES struct has State A and State B. Attempting to go back to State A will encounter a hashset hit which is interpreted by PgCat as "Configs are the same, no need to reload pools" We fix this by attaching a config_hash value to ConnectionPool object and we calculate that value when we create the pool. This eliminates the need for a global variable. One shortcoming here is that changing any config under one user in the pool will trigger a reload for the entire pool (which is fine I think)
313 lines
13 KiB
Ruby
313 lines
13 KiB
Ruby
# frozen_string_literal: true
|
|
require_relative 'spec_helper'
|
|
|
|
describe "Miscellaneous" do
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
|
after do
|
|
processes.all_databases.map(&:reset)
|
|
processes.pgcat.shutdown
|
|
end
|
|
|
|
context "when adding then removing instance using RELOAD" do
|
|
it "works correctly" do
|
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
|
|
current_configs = processes.pgcat.current_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
|
|
extra_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].last.clone
|
|
extra_replica[0] = "127.0.0.1"
|
|
current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << extra_replica
|
|
|
|
processes.pgcat.update_config(current_configs) # with replica added
|
|
processes.pgcat.reload_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
|
|
current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
|
|
|
|
processes.pgcat.update_config(current_configs) # with replica removed again
|
|
processes.pgcat.reload_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
end
|
|
end
|
|
|
|
context "when removing then adding instance back using RELOAD" do
|
|
it "works correctly" do
|
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
|
|
current_configs = processes.pgcat.current_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
|
|
removed_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
|
|
processes.pgcat.update_config(current_configs) # with replica removed
|
|
processes.pgcat.reload_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
|
|
current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << removed_replica
|
|
|
|
processes.pgcat.update_config(current_configs) # with replica added again
|
|
processes.pgcat.reload_config
|
|
correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
|
|
expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
|
|
end
|
|
end
|
|
|
|
describe "TCP Keepalives" do
|
|
# Ideally, we should block TCP traffic to the database using
|
|
# iptables to mimic passive (connection is dropped without a RST packet)
|
|
# but we cannot do this in CircleCI because iptables requires NET_ADMIN
|
|
# capability that we cannot enable in CircleCI
|
|
# Toxiproxy won't work either because it does not block keepalives
|
|
# so our best bet is to query the OS keepalive params set on the socket
|
|
|
|
context "default settings" do
|
|
it "applies default keepalive settings" do
|
|
# We query ss command to verify that we have correct keepalive values set
|
|
# we can only verify the keepalives_idle parameter but that's good enough
|
|
# example output
|
|
#Recv-Q Send-Q Local Address:Port Peer Address:Port Process
|
|
#0 0 127.0.0.1:60526 127.0.0.1:18432 timer:(keepalive,1min59sec,0)
|
|
#0 0 127.0.0.1:60664 127.0.0.1:19432 timer:(keepalive,4.123ms,0)
|
|
|
|
port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
|
|
results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
|
|
results.shift
|
|
results.each { |line| expect(line).to match(/timer:\(keepalive,.*ms,0\)/) }
|
|
end
|
|
end
|
|
|
|
context "changed settings" do
|
|
it "applies keepalive settings from config" do
|
|
new_configs = processes.pgcat.current_config
|
|
|
|
new_configs["general"]["tcp_keepalives_idle"] = 120
|
|
new_configs["general"]["tcp_keepalives_count"] = 1
|
|
new_configs["general"]["tcp_keepalives_interval"] = 1
|
|
processes.pgcat.update_config(new_configs)
|
|
# We need to kill the old process that was using the default configs
|
|
processes.pgcat.stop
|
|
processes.pgcat.start
|
|
processes.pgcat.wait_until_ready
|
|
|
|
port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
|
|
results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
|
|
results.shift
|
|
results.each { |line| expect(line).to include("timer:(keepalive,1min") }
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "Extended Protocol handling" do
|
|
it "does not send packets that client does not expect during extended protocol sequence" do
|
|
new_configs = processes.pgcat.current_config
|
|
|
|
new_configs["general"]["connect_timeout"] = 500
|
|
new_configs["general"]["ban_time"] = 1
|
|
new_configs["general"]["shutdown_timeout"] = 1
|
|
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
|
|
processes.pgcat.update_config(new_configs)
|
|
processes.pgcat.reload_config
|
|
|
|
25.times do
|
|
Thread.new do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
|
ensure
|
|
conn&.close
|
|
end
|
|
end
|
|
|
|
sleep(0.5)
|
|
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
stdout, stderr = with_captured_stdout_stderr do
|
|
15.times do |i|
|
|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
|
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
|
sleep 1
|
|
end
|
|
end
|
|
|
|
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
|
end
|
|
end
|
|
|
|
describe "Pool recycling after config reload" do
|
|
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
|
|
|
it "should update pools for new clients and clients that are no longer in transaction" do
|
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
server_conn.async_exec("BEGIN")
|
|
|
|
# No config change yet, client should set old configs
|
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
expect(current_datebase_from_pg).to eq('shard0')
|
|
|
|
# Swap shards
|
|
new_config = processes.pgcat.current_config
|
|
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
|
|
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
|
|
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
|
|
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
|
|
|
|
# Reload config
|
|
processes.pgcat.update_config(new_config)
|
|
processes.pgcat.reload_config
|
|
sleep 0.5
|
|
|
|
# Config changed but transaction is in progress, client should set old configs
|
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
expect(current_datebase_from_pg).to eq('shard0')
|
|
server_conn.async_exec("COMMIT")
|
|
|
|
# Transaction finished, client should get new configs
|
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
expect(current_datebase_from_pg).to eq('shard1')
|
|
|
|
# New connection should get new configs
|
|
server_conn.close()
|
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
expect(current_datebase_from_pg).to eq('shard1')
|
|
end
|
|
end
|
|
|
|
describe "Clients closing connection in the middle of transaction" do
|
|
it "sends a rollback to the server" do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("BEGIN")
|
|
conn.close
|
|
|
|
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
|
|
end
|
|
end
|
|
|
|
describe "Server version reporting" do
|
|
it "reports correct version for normal and admin databases" do
|
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
expect(server_conn.server_version).not_to eq(0)
|
|
server_conn.close
|
|
|
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
expect(admin_conn.server_version).not_to eq(0)
|
|
admin_conn.close
|
|
end
|
|
end
|
|
|
|
describe "State clearance" do
|
|
context "session mode" do
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
|
|
|
|
it "Clears state before connection checkin" do
|
|
# Both modes of operation should not raise
|
|
# ERROR: prepared statement "prepared_q" already exists
|
|
15.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
|
conn.close
|
|
end
|
|
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
|
conn.async_exec("SET statement_timeout to 1000")
|
|
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
|
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
|
|
conn.close
|
|
end
|
|
|
|
it "Does not send DISCARD ALL unless necessary" do
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("SELECT 1")
|
|
conn.close
|
|
end
|
|
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("SELECT 1")
|
|
conn.async_exec("SET statement_timeout to 5000")
|
|
conn.close
|
|
end
|
|
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
|
end
|
|
end
|
|
|
|
context "transaction mode" do
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
|
it "Clears state before connection checkin" do
|
|
# Both modes of operation should not raise
|
|
# ERROR: prepared statement "prepared_q" already exists
|
|
15.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
|
conn.close
|
|
end
|
|
|
|
15.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.prepare("prepared_q", "SELECT $1")
|
|
conn.close
|
|
end
|
|
end
|
|
|
|
it "Does not send DISCARD ALL unless necessary" do
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("SELECT 1")
|
|
conn.exec_params("SELECT $1", [1])
|
|
conn.close
|
|
end
|
|
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("SELECT 1")
|
|
conn.async_exec("SET statement_timeout to 5000")
|
|
conn.close
|
|
end
|
|
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
|
end
|
|
end
|
|
|
|
context "transaction mode with transactions" do
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
|
it "Does not clear set statement state when declared in a transaction" do
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("BEGIN")
|
|
conn.async_exec("SET statement_timeout to 1000")
|
|
conn.async_exec("COMMIT")
|
|
conn.close
|
|
end
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
|
|
10.times do
|
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
conn.async_exec("BEGIN")
|
|
conn.async_exec("SET LOCAL statement_timeout to 1000")
|
|
conn.async_exec("COMMIT")
|
|
conn.close
|
|
end
|
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
end
|
|
end
|
|
end
|
|
end
|