From 6db51b4a11ca6a4fb8b6e0237c591fe51ce10ce6 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 24 Feb 2022 20:55:19 -0800 Subject: [PATCH] Use Toxiproxy for failover testing (#44) * Toxiproxy * up-to-date config * debug * hm * more * mroe * more * hmm * aha * less logs * cleaner * hmm * we test these now * update readme --- .circleci/pgcat.toml | 108 +++++++++++++++++++++++++++++++++++++++++ .circleci/run_tests.sh | 61 +++++++++++++++++------ README.md | 4 +- src/client.rs | 2 +- 4 files changed, 158 insertions(+), 17 deletions(-) create mode 100644 .circleci/pgcat.toml diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml new file mode 100644 index 0000000..370b06b --- /dev/null +++ b/.circleci/pgcat.toml @@ -0,0 +1,108 @@ +# +# PgCat config example. +# + +# +# General pooler settings +[general] + +# What IP to run on, 0.0.0.0 means accessible from everywhere. +host = "0.0.0.0" + +# Port to run on, same as PgBouncer used in this example. +port = 6432 + +# How many connections to allocate per server. +pool_size = 15 + +# Pool mode (see PgBouncer docs for more). +# session: one server connection per connected client +# transaction: one server connection per client transaction +pool_mode = "transaction" + +# How long to wait before aborting a server connection (ms). +connect_timeout = 100 + +# How much time to give `SELECT 1` health check query to return with a result (ms). +healthcheck_timeout = 100 + +# For how long to ban a server if it fails a health check (seconds). +ban_time = 60 # Seconds + +# Stats will be sent here +statsd_address = "127.0.0.1:8125" + +# +# User to use for authentication against the server. +[user] +name = "sharding_user" +password = "sharding_user" + + +# +# Shards in the cluster +[shards] + +# Shard 0 +[shards.0] + +# [ host, port, role ] +servers = [ + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5433, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], +] +# Database name (e.g. "postgres") +database = "shard0" + +[shards.1] +# [ host, port, role ] +servers = [ + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5433, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], +] +database = "shard1" + +[shards.2] +# [ host, port, role ] +servers = [ + [ "127.0.0.1", 5432, "primary" ], + [ "localhost", 5433, "replica" ], + # [ "127.0.1.1", 5432, "replica" ], +] +database = "shard2" + + +# Settings for our query routing layer. +[query_router] + +# If the client doesn't specify, route traffic to +# this role by default. +# +# any: round-robin between primary and replicas, +# replica: round-robin between replicas only without touching the primary, +# primary: all queries go to the primary unless otherwise specified. +default_role = "any" + + +# Query parser. If enabled, we'll attempt to parse +# every incoming query to determine if it's a read or a write. +# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write, +# we'll direct it to the primary. +query_parser_enabled = false + +# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for +# load balancing of read queries. Otherwise, the primary will only be used for write +# queries. The primary can always be explicitely selected with our custom protocol. +primary_reads_enabled = true + +# So what if you wanted to implement a different hashing function, +# or you've already built one and you want this pooler to use it? +# +# Current options: +# +# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function) +# sha1: A hashing function based on SHA1 +# +sharding_function = "pg_bigint_hash" diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 2204b24..73a237b 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -3,20 +3,34 @@ set -e set -o xtrace +# Start PgCat with a particular log level +# for inspection. +function start_pgcat() { + kill -s SIGINT $(pgrep pgcat) || true + RUST_LOG=${1} ./target/debug/pgcat .circleci/pgcat.toml & + sleep 1 +} + +# Setup the database with shards and user psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql -./target/debug/pgcat & +# Install Toxiproxy to simulate a downed/slow database +wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb +sudo dpkg -i toxiproxy-2.1.4.deb +# Start Toxiproxy +toxiproxy-server & sleep 1 -# Setup PgBench -pgbench -i -h 127.0.0.1 -p 6432 +# Create a database at port 5433, forward it to Postgres +toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica -# Run it -pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple +start_pgcat "info" -# Extended protocol -pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended +# pgbench test +pgbench -i -h 127.0.0.1 -p 6432 && \ + pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \ + pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended # COPY TO STDOUT test psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null @@ -35,18 +49,37 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > / psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null # -# ActiveRecord tests! +# ActiveRecord tests # -cd tests/ruby -sudo gem install bundler -bundle install -ruby tests.rb +cd tests/ruby && \ + sudo gem install bundler && \ + bundle install && \ + ruby tests.rb && \ +cd ../.. + +# Start PgCat in debug to demonstrate failover better +start_pgcat "debug" + +# Add latency to the replica at port 5433 slightly above the healthcheck timeout +toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica +sleep 1 + +# Note the failover in the logs +timeout 5 psql -e -h 127.0.0.1 -p 6432 <<-EOF +SELECT 1; +SELECT 1; +SELECT 1; +EOF + +# Remove latency +toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica + +start_pgcat "info" -cd ../../ # Test session mode (and config reload) sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml -# Reload config +# Reload config test kill -SIGHUP $(pgrep pgcat) # Prepared statements that will only work in session mode diff --git a/README.md b/README.md index b05eaaf..2cc7586 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,8 @@ See [sharding README](./tests/sharding/README.md) for sharding logic testing. | Session pooling | :white_check_mark: | :white_check_mark: | Tested by running pgbench with `--protocol prepared` which only works in session mode. | | `COPY` | :white_check_mark: | :white_check_mark: | `pgbench -i` uses `COPY`. `COPY FROM` is tested as well. | | Query cancellation | :white_check_mark: | :white_check_mark: | `psql -c 'SELECT pg_sleep(1000);'` and press `Ctrl-C`. | -| Load balancing | :x: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. | -| Failover | :x: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing could include using Toxiproxy. | +| Load balancing | :white_check_mark: | :white_check_mark: | We could test this by emitting statistics for each replica and compare them. | +| Failover | :white_check_mark: | :white_check_mark: | Misconfigure a replica in `pgcat.toml` and watch it forward queries to spares. CI testing is using Toxiproxy. | | Sharding | :white_check_mark: | :white_check_mark: | See `tests/sharding` and `tests/ruby` for an Rails/ActiveRecord example. | | Statistics reporting | :x: | :white_check_mark: | Run `nc -l -u 8125` and watch the stats come in every 15 seconds. | | Live config reloading | :white_check_mark: | :white_check_mark: | Run `kill -s SIGHUP $(pgrep pgcat)` and watch the config reload. | diff --git a/src/client.rs b/src/client.rs index fb1997d..b0ca7d1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -311,7 +311,7 @@ impl Client { // or until the client disconnects if we are in session mode. loop { let mut message = if message.len() == 0 { - debug!("Waiting for message inside transaction or in session mode"); + trace!("Waiting for message inside transaction or in session mode"); match read_message(&mut self.read).await { Ok(message) => message,