Files
pgcat/tests/ruby/mirrors_spec.rb
Mostafa Abdelraouf aa89e357e0 PgCat Query Mirroring (#341)
This is an implementation of Query mirroring in PgCat (outlined here #302)

In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with.

This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded.

We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow.
2023-03-10 06:23:51 -06:00

91 lines
3.3 KiB
Ruby

# frozen_string_literal: true
require 'uri'
require_relative 'spec_helper'
describe "Query Mirroing" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
let(:mirror_pg) { PgInstance.new(8432, "sharding_user", "sharding_user", "shard2")}
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
let(:mirror_host) { "localhost" }
before do
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
end
after do
processes.all_databases.map(&:reset)
mirror_pg.reset
processes.pgcat.shutdown
end
it "can mirror a query" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
runs = 15
runs.times { conn.async_exec("SELECT 1 + 2") }
sleep 0.5
expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs)
expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3)
end
context "when main server connection is closed" do
it "closes the mirror connection" do
baseline_count = processes.all_databases.first.count_connections
5.times do |i|
# Force pool cycling to detect zombie mirror connections
new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
[mirror_host, mirror_pg.port.to_s, "0"],
]
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
end
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT 1 + 2")
sleep 0.5
# Expect same number of connection even after pool cycling
expect(processes.all_databases.first.count_connections).to be < baseline_count + 2
end
end
xcontext "when mirror server goes down temporarily" do
it "continues to transmit queries after recovery" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
mirror_pg.take_down do
conn.async_exec("SELECT 1 + 2")
sleep 0.1
end
10.times { conn.async_exec("SELECT 1 + 2") }
sleep 1
expect(mirror_pg.count_select_1_plus_2).to be >= 2
end
end
context "when a mirror is down" do
let(:mirror_host) { "badhost" }
it "does not fail to send the main query" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
# No Errors here
conn.async_exec("SELECT 1 + 2")
expect(processes.all_databases.first.count_select_1_plus_2).to eq(1)
end
it "does not fail to send the main query (even after thousands of mirror attempts)" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
# No Errors here
1000.times { conn.async_exec("SELECT 1 + 2") }
expect(processes.all_databases.first.count_select_1_plus_2).to eq(1000)
end
end
end