From aa89e357e0c90750ab742d6e62034364941664b4 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 10 Mar 2023 06:23:51 -0600 Subject: [PATCH] PgCat Query Mirroring (#341) This is an implementation of Query mirroring in PgCat (outlined here #302) In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with. This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded. We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow. --- .circleci/config.yml | 8 +- .circleci/run_tests.sh | 6 +- .gitignore | 2 +- dev/docker-compose.yaml | 8 +- src/admin.rs | 1 - src/config.rs | 21 +++- src/lib.rs | 1 + src/main.rs | 1 + src/mirrors.rs | 169 ++++++++++++++++++++++++++++ src/pool.rs | 29 ++++- src/query_router.rs | 1 + src/server.rs | 27 +++++ tests/docker/docker-compose.yml | 8 +- tests/ruby/helpers/pg_instance.rb | 12 ++ tests/ruby/helpers/pgcat_process.rb | 8 +- tests/ruby/load_balancing_spec.rb | 1 - tests/ruby/mirrors_spec.rb | 90 +++++++++++++++ 17 files changed, 370 insertions(+), 23 deletions(-) create mode 100644 src/mirrors.rs create mode 100644 tests/ruby/mirrors_spec.rb diff --git a/.circleci/config.yml b/.circleci/config.yml index f7aa899..a43d4bd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,28 +18,28 @@ jobs: RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Copt-level=0 -Clink-dead-code -Coverflow-checks=off -Zpanic_abort_tests -Cpanic=abort -Cinstrument-coverage" RUSTDOCFLAGS: "-Cpanic=abort" - image: postgres:14 - command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 - image: postgres:14 - command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - image: postgres:14 - command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - image: postgres:14 - command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] environment: POSTGRES_USER: postgres POSTGRES_DB: postgres diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 644f22e..a5cfab0 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -92,12 +92,12 @@ sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml kill -SIGHUP $(pgrep pgcat) # Reload config again # -# ActiveRecord tests +# Integration tests and ActiveRecord tests # cd tests/ruby sudo bundle install -bundle exec ruby tests.rb || exit 1 -bundle exec rspec *_spec.rb || exit 1 +bundle exec ruby tests.rb --format documentation || exit 1 +bundle exec rspec *_spec.rb --format documentation || exit 1 cd ../.. # diff --git a/.gitignore b/.gitignore index b3ca013..0b43616 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ /target *.deb .vscode -.profraw +*.profraw cov/ lcov.info diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index ee609e0..da75938 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -33,7 +33,7 @@ services: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 PGPORT: 5432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg2: <<: *common-definition-pg @@ -41,21 +41,21 @@ services: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 7432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg3: <<: *common-definition-pg environment: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 8432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg4: <<: *common-definition-pg environment: <<: *common-env-pg POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 PGPORT: 9432 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] toxiproxy: build: . diff --git a/src/admin.rs b/src/admin.rs index c90f28e..feea3a1 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,3 @@ -use crate::config::Role; use crate::pool::BanReason; /// Admin database. use bytes::{Buf, BufMut, BytesMut}; diff --git a/src/config.rs b/src/config.rs index 517cabc..9b90ebe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,6 +29,8 @@ pub enum Role { Primary, #[serde(alias = "replica", alias = "Replica")] Replica, + #[serde(alias = "mirror", alias = "Mirror")] + Mirror, } impl ToString for Role { @@ -36,6 +38,7 @@ impl ToString for Role { match *self { Role::Primary => "primary".to_string(), Role::Replica => "replica".to_string(), + Role::Mirror => "mirror".to_string(), } } } @@ -90,6 +93,9 @@ pub struct Address { /// The name of this pool (i.e. database name visible to the client). pub pool_name: String, + + /// List of addresses to receive mirrored traffic. + pub mirrors: Vec
, } impl Default for Address { @@ -105,6 +111,7 @@ impl Default for Address { role: Role::Replica, username: String::from("username"), pool_name: String::from("pool_name"), + mirrors: Vec::new(), } } } @@ -114,11 +121,14 @@ impl Address { pub fn name(&self) -> String { match self.role { Role::Primary => format!("{}_shard_{}_primary", self.pool_name, self.shard), - Role::Replica => format!( "{}_shard_{}_replica_{}", self.pool_name, self.shard, self.replica_number ), + Role::Mirror => format!( + "{}_shard_{}_mirror_{}", + self.pool_name, self.shard, self.replica_number + ), } } } @@ -465,11 +475,19 @@ pub struct ServerConfig { pub role: Role, } +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)] +pub struct MirrorServerConfig { + pub host: String, + pub port: u16, + pub mirroring_target_index: usize, +} + /// Shard configuration. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)] pub struct Shard { pub database: String, pub servers: Vec, + pub mirrors: Option>, } impl Shard { @@ -518,6 +536,7 @@ impl Default for Shard { port: 5432, role: Role::Primary, }], + mirrors: None, database: String::from("postgres"), } } diff --git a/src/lib.rs b/src/lib.rs index 63eae59..67aa9cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod constants; pub mod errors; pub mod messages; +pub mod mirrors; pub mod multi_logger; pub mod pool; pub mod scram; diff --git a/src/main.rs b/src/main.rs index b3ef77c..e2ff5d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ mod config; mod constants; mod errors; mod messages; +mod mirrors; mod multi_logger; mod pool; mod prometheus; diff --git a/src/mirrors.rs b/src/mirrors.rs new file mode 100644 index 0000000..6a59172 --- /dev/null +++ b/src/mirrors.rs @@ -0,0 +1,169 @@ +/// A mirrored PostgreSQL client. +/// Packets arrive to us through a channel from the main client and we send them to the server. +use bb8::Pool; +use bytes::{Bytes, BytesMut}; + +use crate::config::{get_config, Address, Role, User}; +use crate::pool::{ClientServerMap, ServerPool}; +use crate::stats::get_reporter; +use log::{error, info, trace, warn}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +pub struct MirroredClient { + address: Address, + user: User, + database: String, + bytes_rx: Receiver, + disconnect_rx: Receiver<()>, +} + +impl MirroredClient { + async fn create_pool(&self) -> Pool { + let config = get_config(); + let default = std::time::Duration::from_millis(10_000).as_millis() as u64; + let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) { + Some(cfg) => ( + cfg.connect_timeout.unwrap_or(default), + cfg.idle_timeout.unwrap_or(default), + ), + None => (default, default), + }; + + let manager = ServerPool::new( + self.address.clone(), + self.user.clone(), + self.database.as_str(), + ClientServerMap::default(), + get_reporter(), + ); + + Pool::builder() + .max_size(1) + .connection_timeout(std::time::Duration::from_millis(connection_timeout)) + .idle_timeout(Some(std::time::Duration::from_millis(idle_timeout))) + .test_on_check_out(false) + .build(manager) + .await + .unwrap() + } + + pub fn start(mut self) { + tokio::spawn(async move { + let pool = self.create_pool().await; + let address = self.address.clone(); + loop { + let mut server = match pool.get().await { + Ok(server) => server, + Err(err) => { + error!( + "Failed to get connection from pool, Discarding message {:?}, {:?}", + err, + address.clone() + ); + continue; + } + }; + + tokio::select! { + // Exit channel events + _ = self.disconnect_rx.recv() => { + info!("Got mirror exit signal, exiting {:?}", address.clone()); + break; + } + + // Incoming data from server (we read to clear the socket buffer and discard the data) + recv_result = server.recv() => { + match recv_result { + Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()), + Err(err) => { + server.mark_bad(); + error!("Failed to receive from mirror {:?} {:?}", err, address.clone()); + } + } + } + + // Messages to send to the server + message = self.bytes_rx.recv() => { + match message { + Some(bytes) => { + match server.send(&BytesMut::from(&bytes[..])).await { + Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()), + Err(err) => { + server.mark_bad(); + error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()) + } + } + } + None => { + info!("Mirror channel closed, exiting {:?}", address.clone()); + break; + }, + } + } + } + } + }); + } +} +pub struct MirroringManager { + pub byte_senders: Vec>, + pub disconnect_senders: Vec>, +} +impl MirroringManager { + pub fn from_addresses( + user: User, + database: String, + addresses: Vec
, + ) -> MirroringManager { + let mut byte_senders: Vec> = vec![]; + let mut exit_senders: Vec> = vec![]; + + addresses.iter().for_each(|mirror| { + let (bytes_tx, bytes_rx) = channel::(500); + let (exit_tx, exit_rx) = channel::<()>(1); + let mut addr = mirror.clone(); + addr.role = Role::Mirror; + let client = MirroredClient { + user: user.clone(), + database: database.to_owned(), + address: addr, + bytes_rx, + disconnect_rx: exit_rx, + }; + exit_senders.push(exit_tx.clone()); + byte_senders.push(bytes_tx.clone()); + client.start(); + }); + + Self { + byte_senders: byte_senders, + disconnect_senders: exit_senders, + } + } + + pub fn send(self: &mut Self, bytes: &BytesMut) { + let cpy = bytes.clone().freeze(); + self.byte_senders + .iter_mut() + .for_each(|sender| match sender.try_send(cpy.clone()) { + Ok(_) => {} + Err(err) => { + warn!("Failed to send bytes to a mirror channel {}", err); + } + }); + } + + pub fn disconnect(self: &mut Self) { + self.disconnect_senders + .iter_mut() + .for_each(|sender| match sender.try_send(()) { + Ok(_) => {} + Err(err) => { + warn!( + "Failed to send disconnect signal to a mirror channel {}", + err + ); + } + }); + } +} diff --git a/src/pool.rs b/src/pool.rs index 0a0a53f..3a6ec3e 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -193,7 +193,7 @@ impl ConnectionPool { let config = get_config(); let mut new_pools = HashMap::new(); - let mut address_id = 0; + let mut address_id: usize = 0; for (pool_name, pool_config) in &config.pools { let new_pool_hash_value = pool_config.hash_value(); @@ -244,7 +244,33 @@ impl ConnectionPool { let mut servers = Vec::new(); let mut replica_number = 0; + // Load Mirror settings for (address_index, server) in shard.servers.iter().enumerate() { + let mut mirror_addresses = vec![]; + if let Some(mirror_settings_vec) = &shard.mirrors { + for (mirror_idx, mirror_settings) in + mirror_settings_vec.iter().enumerate() + { + if mirror_settings.mirroring_target_index != address_index { + continue; + } + mirror_addresses.push(Address { + id: address_id, + database: shard.database.clone(), + host: mirror_settings.host.clone(), + port: mirror_settings.port, + role: server.role, + address_index: mirror_idx, + replica_number, + shard: shard_idx.parse::().unwrap(), + username: user.username.clone(), + pool_name: pool_name.clone(), + mirrors: vec![], + }); + address_id += 1; + } + } + let address = Address { id: address_id, database: shard.database.clone(), @@ -256,6 +282,7 @@ impl ConnectionPool { shard: shard_idx.parse::().unwrap(), username: user.username.clone(), pool_name: pool_name.clone(), + mirrors: mirror_addresses, }; address_id += 1; diff --git a/src/query_router.rs b/src/query_router.rs index fff5bba..fbff68e 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -223,6 +223,7 @@ impl QueryRouter { Command::ShowServerRole => match self.active_role { Some(Role::Primary) => Role::Primary.to_string(), Some(Role::Replica) => Role::Replica.to_string(), + Some(Role::Mirror) => Role::Mirror.to_string(), None => { if self.query_parser_enabled() { String::from("auto") diff --git a/src/server.rs b/src/server.rs index 1d9bcd1..b3dbd6f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,6 +14,7 @@ use crate::config::{Address, User}; use crate::constants::*; use crate::errors::Error; use crate::messages::*; +use crate::mirrors::MirroringManager; use crate::pool::ClientServerMap; use crate::scram::ScramSha256; use crate::stats::Reporter; @@ -68,6 +69,8 @@ pub struct Server { // Last time that a successful server send or response happened last_activity: SystemTime, + + mirror_manager: Option, } impl Server { @@ -334,6 +337,14 @@ impl Server { stats, application_name: String::new(), last_activity: SystemTime::now(), + mirror_manager: match address.mirrors.len() { + 0 => None, + _ => Some(MirroringManager::from_addresses( + user.clone(), + database.to_owned(), + address.mirrors.clone(), + )), + }, }; server.set_name("pgcat").await?; @@ -384,6 +395,7 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> { + self.mirror_send(messages); self.stats.data_sent(messages.len(), self.server_id); match write_all_half(&mut self.write, messages).await { @@ -674,6 +686,20 @@ impl Server { pub fn mark_dirty(&mut self) { self.needs_cleanup = true; } + + pub fn mirror_send(&mut self, bytes: &BytesMut) { + match self.mirror_manager.as_mut() { + Some(manager) => manager.send(bytes), + None => (), + } + } + + pub fn mirror_disconnect(&mut self) { + match self.mirror_manager.as_mut() { + Some(manager) => manager.disconnect(), + None => (), + } + } } impl Drop for Server { @@ -681,6 +707,7 @@ impl Drop for Server { /// the socket is in non-blocking mode, so it may not be ready /// for a write. fn drop(&mut self) { + self.mirror_disconnect(); self.stats.server_disconnecting(self.server_id); let mut bytes = BytesMut::with_capacity(4); diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index e44dc52..e57d852 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -8,7 +8,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"] + command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg2: image: postgres:14 network_mode: "service:main" @@ -17,7 +17,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"] + command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg3: image: postgres:14 network_mode: "service:main" @@ -26,7 +26,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"] + command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg4: image: postgres:14 network_mode: "service:main" @@ -35,7 +35,7 @@ services: POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 - command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"] + command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] main: build: . command: ["bash", "/app/tests/docker/run.sh"] diff --git a/tests/ruby/helpers/pg_instance.rb b/tests/ruby/helpers/pg_instance.rb index 3116457..a382824 100644 --- a/tests/ruby/helpers/pg_instance.rb +++ b/tests/ruby/helpers/pg_instance.rb @@ -38,6 +38,8 @@ class PgInstance def reset reset_toxics reset_stats + drop_connections + sleep 0.1 end def toxiproxy @@ -66,12 +68,22 @@ class PgInstance def reset_toxics Toxiproxy[@toxiproxy_name].toxics.each(&:destroy) + sleep 0.1 end def reset_stats with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") } end + def drop_connections + username = with_connection { |c| c.async_exec("SELECT current_user")[0]["current_user"] } + with_connection { |c| c.async_exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename='#{username}'") } + end + + def count_connections + with_connection { |c| c.async_exec("SELECT COUNT(*) as count FROM pg_stat_activity")[0]["count"].to_i } + end + def count_query(query) with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i } end diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb index 2108eaf..6120c99 100644 --- a/tests/ruby/helpers/pgcat_process.rb +++ b/tests/ruby/helpers/pgcat_process.rb @@ -29,7 +29,7 @@ class PgcatProcess else '../../target/debug/pgcat' end - + @command = "#{command_path} #{@config_filename}" FileUtils.cp("../../pgcat.toml", @config_filename) @@ -48,12 +48,14 @@ class PgcatProcess @original_config = current_config output_to_write = TOML::Generator.new(config_hash).body output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,') + output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*\]/, ',\1]') File.write(@config_filename, output_to_write) end def current_config - old_cfg = File.read(@config_filename) - loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",') + loadable_string = File.read(@config_filename) + loadable_string = loadable_string.gsub(/,\s*(\d+)\s*,/, ', "\1",') + loadable_string = loadable_string.gsub(/,\s*(\d+)\s*\]/, ', "\1"]') TOML.load(loadable_string) end diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb index e7b89ee..cd64740 100644 --- a/tests/ruby/load_balancing_spec.rb +++ b/tests/ruby/load_balancing_spec.rb @@ -46,7 +46,6 @@ describe "Random Load Balancing" do end end - expect(failed_count).to be <= 2 processes.all_databases.each do |instance| queries_routed = instance.count_select_1_plus_2 if processes.replicas[0..1].include?(instance) diff --git a/tests/ruby/mirrors_spec.rb b/tests/ruby/mirrors_spec.rb new file mode 100644 index 0000000..801df28 --- /dev/null +++ b/tests/ruby/mirrors_spec.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true +require 'uri' +require_relative 'spec_helper' + +describe "Query Mirroing" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + let(:mirror_pg) { PgInstance.new(8432, "sharding_user", "sharding_user", "shard2")} + let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") } + let(:mirror_host) { "localhost" } + + before do + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + ] + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + end + + after do + processes.all_databases.map(&:reset) + mirror_pg.reset + processes.pgcat.shutdown + end + + it "can mirror a query" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + runs = 15 + runs.times { conn.async_exec("SELECT 1 + 2") } + sleep 0.5 + expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs) + expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3) + end + + context "when main server connection is closed" do + it "closes the mirror connection" do + baseline_count = processes.all_databases.first.count_connections + 5.times do |i| + # Force pool cycling to detect zombie mirror connections + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i + new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + [mirror_host, mirror_pg.port.to_s, "0"], + ] + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + end + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT 1 + 2") + sleep 0.5 + # Expect same number of connection even after pool cycling + expect(processes.all_databases.first.count_connections).to be < baseline_count + 2 + end + end + + xcontext "when mirror server goes down temporarily" do + it "continues to transmit queries after recovery" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + mirror_pg.take_down do + conn.async_exec("SELECT 1 + 2") + sleep 0.1 + end + 10.times { conn.async_exec("SELECT 1 + 2") } + sleep 1 + expect(mirror_pg.count_select_1_plus_2).to be >= 2 + end + end + + context "when a mirror is down" do + let(:mirror_host) { "badhost" } + + it "does not fail to send the main query" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + # No Errors here + conn.async_exec("SELECT 1 + 2") + expect(processes.all_databases.first.count_select_1_plus_2).to eq(1) + end + + it "does not fail to send the main query (even after thousands of mirror attempts)" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + # No Errors here + 1000.times { conn.async_exec("SELECT 1 + 2") } + expect(processes.all_databases.first.count_select_1_plus_2).to eq(1000) + end + end +end