2022-08-30 11:14:53 -05:00
|
|
|
# frozen_string_literal: true
|
|
|
|
|
require_relative 'spec_helper'
|
|
|
|
|
|
2023-01-17 06:52:18 -06:00
|
|
|
describe "Random Load Balancing" do
|
2022-08-30 11:14:53 -05:00
|
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
|
|
|
|
after do
|
|
|
|
|
processes.all_databases.map(&:reset)
|
|
|
|
|
processes.pgcat.shutdown
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
context "under regular circumstances" do
|
|
|
|
|
it "balances query volume between all instances" do
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
|
|
|
|
|
query_count = QUERY_COUNT
|
|
|
|
|
expected_share = query_count / processes.all_databases.count
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
|
|
|
|
query_count.times do
|
|
|
|
|
conn.async_exec("SELECT 1 + 2")
|
|
|
|
|
rescue
|
|
|
|
|
failed_count += 1
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
expect(failed_count).to eq(0)
|
|
|
|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
|
|
|
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
context "when some replicas are down" do
|
2023-01-17 06:52:18 -06:00
|
|
|
it "balances query volume between working instances" do
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
|
|
|
|
processes[:replicas][0].take_down do
|
|
|
|
|
processes[:replicas][1].take_down do
|
|
|
|
|
QUERY_COUNT.times do
|
|
|
|
|
conn.async_exec("SELECT 1 + 2")
|
|
|
|
|
rescue
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
failed_count += 1
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
2023-02-16 23:09:22 -06:00
|
|
|
expect(failed_count).to be <= 2
|
2023-01-17 06:52:18 -06:00
|
|
|
processes.all_databases.each do |instance|
|
|
|
|
|
queries_routed = instance.count_select_1_plus_2
|
|
|
|
|
if processes.replicas[0..1].include?(instance)
|
|
|
|
|
expect(queries_routed).to eq(0)
|
|
|
|
|
else
|
|
|
|
|
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
describe "Least Outstanding Queries Load Balancing" do
|
|
|
|
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }
|
|
|
|
|
after do
|
|
|
|
|
processes.all_databases.map(&:reset)
|
|
|
|
|
processes.pgcat.shutdown
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
context "under homogenous load" do
|
|
|
|
|
it "balances query volume between all instances" do
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
|
|
|
|
|
query_count = QUERY_COUNT
|
|
|
|
|
expected_share = query_count / processes.all_databases.count
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
|
|
|
|
query_count.times do
|
|
|
|
|
conn.async_exec("SELECT 1 + 2")
|
|
|
|
|
rescue
|
|
|
|
|
failed_count += 1
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
expect(failed_count).to eq(0)
|
|
|
|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
|
|
|
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
context "under heterogeneous load" do
|
2023-02-08 11:35:38 -06:00
|
|
|
xit "balances query volume between all instances based on how busy they are" do
|
2023-01-17 06:52:18 -06:00
|
|
|
slow_query_count = 2
|
|
|
|
|
threads = Array.new(slow_query_count) do
|
|
|
|
|
Thread.new do
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
2023-01-19 05:18:08 -06:00
|
|
|
conn.async_exec("BEGIN")
|
2023-01-17 06:52:18 -06:00
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
|
|
|
|
|
query_count = QUERY_COUNT
|
|
|
|
|
expected_share = query_count / (processes.all_databases.count - slow_query_count)
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
|
|
|
|
query_count.times do
|
|
|
|
|
conn.async_exec("SELECT 1 + 2")
|
|
|
|
|
rescue
|
|
|
|
|
failed_count += 1
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
expect(failed_count).to eq(0)
|
|
|
|
|
# Under LOQ, we expect replicas running the slow pg_sleep
|
|
|
|
|
# to get no selects
|
|
|
|
|
expect(
|
|
|
|
|
processes.
|
|
|
|
|
all_databases.
|
|
|
|
|
map(&:count_select_1_plus_2).
|
|
|
|
|
count { |instance_share| instance_share == 0 }
|
|
|
|
|
).to eq(slow_query_count)
|
|
|
|
|
|
|
|
|
|
# We also expect the quick queries to be spread across
|
|
|
|
|
# the idle servers only
|
|
|
|
|
processes.
|
|
|
|
|
all_databases.
|
|
|
|
|
map(&:count_select_1_plus_2).
|
|
|
|
|
reject { |instance_share| instance_share == 0 }.
|
|
|
|
|
each do |instance_share|
|
|
|
|
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
threads.map(&:join)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
context "when some replicas are down" do
|
2022-08-30 11:14:53 -05:00
|
|
|
it "balances query volume between working instances" do
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
|
|
|
|
processes[:replicas][0].take_down do
|
|
|
|
|
processes[:replicas][1].take_down do
|
|
|
|
|
QUERY_COUNT.times do
|
|
|
|
|
conn.async_exec("SELECT 1 + 2")
|
|
|
|
|
rescue
|
|
|
|
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
|
|
|
failed_count += 1
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
expect(failed_count).to eq(2)
|
|
|
|
|
processes.all_databases.each do |instance|
|
|
|
|
|
queries_routed = instance.count_select_1_plus_2
|
|
|
|
|
if processes.replicas[0..1].include?(instance)
|
|
|
|
|
expect(queries_routed).to eq(0)
|
|
|
|
|
else
|
|
|
|
|
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|