From 7894bba59b8321ffdfbc9c461b0e3450043e0c66 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Tue, 17 Jan 2023 06:52:18 -0600 Subject: [PATCH] Introduce least-outstanding-connections load balancing (#282) Least outstanding connections load balancing can improve the load distribution between instances but for Pgcat it may also improve handling slow replicas that don't go completely down. With LoC, traffic will quickly move away from the slow replica without waiting for the replica to be banned. If all replicas slow down equally (due to a bad query that is hitting all replicas), the algorithm will degenerate to Random Load Balancing (which is what we had in Pgcat until today). This may also allow Pgcat to accommodate pools with differently-sized replicas. --- src/config.rs | 36 +++++++++- src/pool.rs | 32 ++++++++- src/query_router.rs | 1 + tests/ruby/helpers/pgcat_helper.rb | 9 ++- tests/ruby/load_balancing_spec.rb | 106 ++++++++++++++++++++++++++++- 5 files changed, 177 insertions(+), 7 deletions(-) diff --git a/src/config.rs b/src/config.rs index e8be947..219f0de 100644 --- a/src/config.rs +++ b/src/config.rs @@ -264,7 +264,6 @@ pub enum PoolMode { #[serde(alias = "session", alias = "Session")] Session, } - impl ToString for PoolMode { fn to_string(&self) -> String { match *self { @@ -274,11 +273,33 @@ impl ToString for PoolMode { } } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)] +pub enum LoadBalancingMode { + #[serde(alias = "random", alias = "Random")] + Random, + + #[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")] + LeastOutstandingConnections, +} +impl ToString for LoadBalancingMode { + fn to_string(&self) -> String { + match *self { + LoadBalancingMode::Random => "random".to_string(), + LoadBalancingMode::LeastOutstandingConnections => { + "least_outstanding_connections".to_string() + } + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct Pool { #[serde(default = "Pool::default_pool_mode")] pub pool_mode: PoolMode, + #[serde(default = "Pool::default_load_balancing_mode")] + pub load_balancing_mode: LoadBalancingMode, + pub default_role: String, #[serde(default)] // False @@ -305,6 +326,10 @@ impl Pool { PoolMode::Transaction } + pub fn default_load_balancing_mode() -> LoadBalancingMode { + LoadBalancingMode::Random + } + pub fn default_automatic_sharding_key() -> Option { None } @@ -345,6 +370,7 @@ impl Default for Pool { fn default() -> Pool { Pool { pool_mode: Self::default_pool_mode(), + load_balancing_mode: Self::default_load_balancing_mode(), shards: BTreeMap::from([(String::from("1"), Shard::default())]), users: BTreeMap::default(), default_role: String::from("any"), @@ -471,6 +497,10 @@ impl From<&Config> for std::collections::HashMap { format!("pools.{}.pool_mode", pool_name), pool.pool_mode.to_string(), ), + ( + format!("pools.{}.load_balancing_mode", pool_name), + pool.load_balancing_mode.to_string(), + ), ( format!("pools.{}.primary_reads_enabled", pool_name), pool.primary_reads_enabled.to_string(), @@ -594,6 +624,10 @@ impl Config { "[pool: {}] Pool mode: {:?}", pool_name, pool_config.pool_mode ); + info!( + "[pool: {}] Load Balancing mode: {:?}", + pool_name, pool_config.load_balancing_mode + ); let connect_timeout = match pool_config.connect_timeout { Some(connect_timeout) => connect_timeout, None => self.general.connect_timeout, diff --git a/src/pool.rs b/src/pool.rs index 94f6962..82720aa 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -12,7 +12,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; -use crate::config::{get_config, Address, General, PoolMode, Role, User}; +use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User}; use crate::errors::Error; use crate::server::Server; @@ -62,6 +62,9 @@ pub struct PoolSettings { /// Transaction or Session. pub pool_mode: PoolMode, + /// Random or LeastOutstandingConnections. + pub load_balancing_mode: LoadBalancingMode, + // Number of shards. pub shards: usize, @@ -94,6 +97,7 @@ impl Default for PoolSettings { fn default() -> PoolSettings { PoolSettings { pool_mode: PoolMode::Transaction, + load_balancing_mode: LoadBalancingMode::Random, shards: 1, user: User::default(), default_role: None, @@ -257,6 +261,7 @@ impl ConnectionPool { server_info: BytesMut::new(), settings: PoolSettings { pool_mode: pool_config.pool_mode, + load_balancing_mode: pool_config.load_balancing_mode, // shards: pool_config.shards.clone(), shards: shard_ids.len(), user: user.clone(), @@ -356,8 +361,17 @@ impl ConnectionPool { .filter(|address| address.role == role) .collect(); - // Random load balancing + // We shuffle even if least_outstanding_queries is used to avoid imbalance + // in cases where all candidates have more or less the same number of outstanding + // queries candidates.shuffle(&mut thread_rng()); + if self.settings.load_balancing_mode == LoadBalancingMode::LeastOutstandingConnections { + candidates.sort_by(|a, b| { + self.busy_connection_count(b) + .partial_cmp(&self.busy_connection_count(a)) + .unwrap() + }); + } while !candidates.is_empty() { // Get the next candidate @@ -565,6 +579,20 @@ impl ConnectionPool { pub fn server_info(&self) -> BytesMut { self.server_info.clone() } + + fn busy_connection_count(&self, address: &Address) -> u32 { + let state = self.pool_state(address.shard, address.address_index); + let idle = state.idle_connections; + let provisioned = state.connections; + + if idle > provisioned { + // Unlikely but avoids an overflow panic if this ever happens + return 0; + } + let busy = provisioned - idle; + debug!("{:?} has {:?} busy connections", address, busy); + return busy; + } } /// Wrapper for the bb8 connection pool. diff --git a/src/query_router.rs b/src/query_router.rs index 5090571..03f4601 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -768,6 +768,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, + load_balancing_mode: crate::config::LoadBalancingMode::Random, shards: 2, user: crate::config::User::default(), default_role: Some(Role::Replica), diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 55847ed..ffa6095 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -5,7 +5,7 @@ require_relative 'pg_instance' module Helpers module Pgcat - def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") + def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -23,6 +23,7 @@ module Helpers "#{pool_name}" => { "default_role" => "any", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", @@ -46,7 +47,7 @@ module Helpers end end - def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction") + def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -64,6 +65,7 @@ module Helpers "#{pool_name}" => { "default_role" => "primary", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", @@ -90,7 +92,7 @@ module Helpers end end - def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction") + def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -111,6 +113,7 @@ module Helpers "#{pool_name}" => { "default_role" => "any", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb index bd98a83..8be066d 100644 --- a/tests/ruby/load_balancing_spec.rb +++ b/tests/ruby/load_balancing_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require_relative 'spec_helper' -describe "Load Balancing" do +describe "Random Load Balancing" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } after do processes.all_databases.map(&:reset) @@ -59,3 +59,107 @@ describe "Load Balancing" do end end +describe "Least Outstanding Queries Load Balancing" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") } + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + context "under homogenous load" do + it "balances query volume between all instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + + query_count = QUERY_COUNT + expected_share = query_count / processes.all_databases.count + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + + context "under heterogeneous load" do + it "balances query volume between all instances based on how busy they are" do + slow_query_count = 2 + threads = Array.new(slow_query_count) do + Thread.new do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT pg_sleep(1)") + end + end + + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + + query_count = QUERY_COUNT + expected_share = query_count / (processes.all_databases.count - slow_query_count) + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + # Under LOQ, we expect replicas running the slow pg_sleep + # to get no selects + expect( + processes. + all_databases. + map(&:count_select_1_plus_2). + count { |instance_share| instance_share == 0 } + ).to eq(slow_query_count) + + # We also expect the quick queries to be spread across + # the idle servers only + processes. + all_databases. + map(&:count_select_1_plus_2). + reject { |instance_share| instance_share == 0 }. + each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + + threads.map(&:join) + end + end + + context "when some replicas are down" do + it "balances query volume between working instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + expected_share = QUERY_COUNT / (processes.all_databases.count - 2) + failed_count = 0 + + processes[:replicas][0].take_down do + processes[:replicas][1].take_down do + QUERY_COUNT.times do + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + end + end + + expect(failed_count).to eq(2) + processes.all_databases.each do |instance| + queries_routed = instance.count_select_1_plus_2 + if processes.replicas[0..1].include?(instance) + expect(queries_routed).to eq(0) + else + expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + end +end +