Compare commits

..

17 Commits

Author SHA1 Message Date
Jose Fernandez (magec)
16a2cece21 Allow failover for replicas 2024-11-12 17:43:36 +01:00
Mostafa
0ee59c0c40 Another no-op helm release (#853) 2024-11-08 06:07:12 -06:00
Mostafa
b61d2cc6f0 Use main branch for helm chart releases (#852) 2024-11-08 06:04:42 -06:00
Jose Fernández
c11418c083 Revert "Do not unban replicas if a primary is available" (#850)
Revert "Do not unban replicas if a primary is available (#843)"

This reverts commit cdcfa99fb9.
2024-11-07 22:00:43 +01:00
Jose Fernández
c9544bdff2 Fix default_role being ignored when query_parser_enabled was false (#847)
Fix default_role being ignored when query_parser_enabled was false
2024-11-07 11:11:49 -06:00
Jose Fernández
cdcfa99fb9 Do not unban replicas if a primary is available (#843)
Add `unban_replicas_when_all_banned` to control unbanning replicas behavior.
2024-11-07 11:11:11 -06:00
Víťa Tauer
f27dc6b483 Fixing invalid setting name in pgcat.toml (#849) 2024-11-07 06:17:09 -06:00
Mostafa
326efc22b3 Another no-op release for helm (#845)
Another no-op release
2024-11-02 18:05:41 -05:00
Mostafa
01c6afb2e5 Attempt a helm chart release (#844)
Attempt a release

Co-authored-by: Mostafa <no_reply@github.com>
2024-11-02 11:55:18 -05:00
Nicolas Vanelslande
a68071dd28 Bump bb8 from 0.8.1 to 0.8.6 (#709)
* Update bb8 to 0.8.6

To get https://github.com/djc/bb8/pull/186 and https://github.com/djc/bb8/pull/189
which fix potential deadlocks (https://github.com/djc/bb8/issues/154).

Also, this (https://github.com/djc/bb8/pull/225) was needed to prevent a connection
leak which was conveniently spotted in our integration tests.

* Ignore ./.bundle (created by dev console)

---------

Co-authored-by: Jose Fernandez (magec) <joseferper@gmail.com>
2024-10-28 06:49:36 -05:00
Mostafa
c27d801abf Rename a couple of variables (#839) 2024-10-23 06:38:07 -05:00
Javier Goday
186e72298f #829: read/write splitting on CTE mutable statements (#835) 2024-10-23 06:20:04 -05:00
Sebastian Serth
3935366d86 End Prometheus stats with a new line separator (#826)
End prometheus stats with a new line separator

According to the [OpenMetrics specification](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#overall-structure), each line MUST end with `\n`. Previously, the last line was not ending with `\n`, so that strict parsers had issues reading the Prometheus stats.
2024-09-22 17:14:04 -05:00
Sean McGivern
b575935b1d Improve documentation for connect_timeout and add min_pool_size (#822)
Currently, `connect_timeout` sounds like it should be for connections to
the Postgres server. It's actually used for obtaining a connection from
the pool.
2024-09-18 06:56:17 -05:00
Shijun Wang
efbab1c333 Helm chart improvements including allowing user password to be pulled from K8s secret (#753)
* Make user min_pool_size configurable

* Set user server_lifetime only if specified

* Increment chart version

* Use default instea of or

* Allow enabling server_tls

* statement_timeout default value

* Allow pulling password from existing secret

---------

Co-authored-by: Mostafa Abdelraouf <mostafa.mohmmed@gmail.com>
2024-09-14 09:57:17 -05:00
Mostafa Abdelraouf
9f12d7958e Fix Ruby tests (#819)
Build is failing with this error

Downloading activerecord-3.2.14 revealed dependencies not in the API or the
lockfile (activesupport (= 3.2.14), activemodel (= 3.2.14), arel (~> 3.0.2),
tzinfo (~> 0.3.29)).
Either installing with `--full-index` or running `bundle update activerecord`
should fix the problem.

After ActiveSupport was updated.

This PR fixes that
2024-09-13 20:02:38 -05:00
dependabot[bot]
e6634ef461 chore(deps): bump activesupport from 7.0.4.1 to 7.0.7.1 in /tests/ruby (#804)
Bumps [activesupport](https://github.com/rails/rails) from 7.0.4.1 to 7.0.7.1.
- [Release notes](https://github.com/rails/rails/releases)
- [Changelog](https://github.com/rails/rails/blob/v7.2.1/activesupport/CHANGELOG.md)
- [Commits](https://github.com/rails/rails/compare/v7.0.4.1...v7.0.7.1)

---
updated-dependencies:
- dependency-name: activesupport
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-13 19:43:26 -05:00
17 changed files with 297 additions and 77 deletions

1
.gitignore vendored
View File

@@ -12,3 +12,4 @@ dev/cache
!dev/cache/.keepme
.venv
**/__pycache__
.bundle

View File

@@ -36,10 +36,11 @@ Port at which prometheus exporter listens on.
### connect_timeout
```
path: general.connect_timeout
default: 5000 # milliseconds
default: 1000 # milliseconds
```
How long to wait before aborting a server connection (ms).
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
### idle_timeout
```
@@ -308,6 +309,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.
### replica_to_primary_failover_enabled
```
path: pools.<pool_name>.replica_to_primary_failover_enabled
default: "false"
```
If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
and all replicas are banned, queries will be sent to the primary (until a replica is back online).
### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
@@ -462,10 +472,18 @@ path: pools.<pool_name>.users.<user_index>.pool_size
default: 9
```
Maximum number of server connections that can be established for this user
Maximum number of server connections that can be established for this user.
The maximum number of connection from a single Pgcat process to any database in the cluster
is the sum of pool_size across all users.
### min_pool_size
```
path: pools.<pool_name>.users.<user_index>.min_pool_size
default: 0
```
Minimum number of idle server connections to retain for this pool.
### statement_timeout
```
path: pools.<pool_name>.users.<user_index>.statement_timeout
@@ -475,6 +493,16 @@ default: 0
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
0 means it is disabled.
### connect_timeout
```
path: pools.<pool_name>.users.<user_index>.connect_timeout
default: <UNSET> # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
If unset, uses the `connect_timeout` defined globally.
## `pools.<pool_name>.shards.<shard_index>` Section
### servers
@@ -502,4 +530,3 @@ default: "shard0"
```
Database name (e.g. "postgres")

23
Cargo.lock generated
View File

@@ -192,12 +192,11 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "bb8"
version = "0.8.1"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot",
"tokio",
@@ -1342,7 +1341,7 @@ checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
dependencies = [
"log",
"ring",
"rustls-webpki",
"rustls-webpki 0.101.1",
"sct",
]
@@ -1356,10 +1355,14 @@ dependencies = [
]
[[package]]
name = "rustls-pki-types"
version = "1.8.0"
name = "rustls-webpki"
version = "0.100.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "rustls-webpki"
@@ -2035,11 +2038,11 @@ dependencies = [
[[package]]
name = "webpki-roots"
version = "0.26.5"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bd24728e5af82c6c4ec1b66ac4844bdf8156257fccda846ec58b42cd0cdbe6a"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-pki-types",
"rustls-webpki 0.100.2",
]
[[package]]

View File

@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "0.8.1"
bb8 = "=0.8.6"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
@@ -41,7 +41,7 @@ atomic_enum = "0.2.0"
postgres-protocol = "0.6.5"
fallible-iterator = "0.2"
pin-project = "1"
webpki-roots = "0.26"
webpki-roots = "0.23"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"

View File

@@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
### Failover
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
The ban time can be changed with `ban_time`. The default is 60 seconds.

View File

@@ -5,4 +5,4 @@ maintainers:
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.0
version: 0.2.4

View File

@@ -15,6 +15,7 @@ stringData:
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
server_tls = {{ .Values.configuration.general.server_tls }}
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
@@ -58,11 +59,21 @@ stringData:
##
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
username = {{ $user.username | quote }}
{{- if $user.password }}
password = {{ $user.password | quote }}
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
{{- if $secret }}
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
password = {{ $password | quote }}
{{- end }}
{{- end }}
pool_size = {{ $user.pool_size }}
statement_timeout = {{ $user.statement_timeout }}
min_pool_size = 3
server_lifetime = 60000
statement_timeout = {{ default 0 $user.statement_timeout }}
min_pool_size = {{ default 3 $user.min_pool_size }}
{{- if $user.server_lifetime }}
server_lifetime = {{ $user.server_lifetime }}
{{- end }}
{{- if and $user.server_username $user.server_password }}
server_username = {{ $user.server_username | quote }}
server_password = {{ $user.server_password | quote }}

View File

@@ -175,6 +175,9 @@ configuration:
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
# Whether to use TLS for server connections or not.
server_tls: false
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
@@ -315,7 +318,9 @@ configuration:
# ## Credentials for users that may connect to this cluster
# ## @param users [array]
# ## @param users[0].username Name of the env var (required)
# ## @param users[0].password Value for the env var (required)
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
# ## @param users[0].passwordSecret.name Name of the secret containing the password
# ## @param users[0].passwordSecret.key Key in the secret containing the password
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# users: []

View File

@@ -1 +1,2 @@
sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0"
# default_shard = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?

View File

@@ -881,6 +881,7 @@ where
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries

View File

@@ -541,6 +541,9 @@ pub struct Pool {
#[serde(default = "Pool::default_default_role")]
pub default_role: String,
#[serde(default)] // False
pub replica_to_primary_failover_enabled: bool,
#[serde(default)] // False
pub query_parser_enabled: bool,
@@ -734,6 +737,7 @@ impl Default for Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
default_role: String::from("any"),
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,

View File

@@ -162,6 +162,9 @@ pub struct PoolSettings {
// Default server role to connect to.
pub default_role: Option<Role>,
// Whether or not we should use primary when replicas are unavailable
pub replica_to_primary_failover_enabled: bool,
// Enable/disable query parser.
pub query_parser_enabled: bool,
@@ -219,6 +222,7 @@ impl Default for PoolSettings {
user: User::default(),
db: String::default(),
default_role: None,
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
@@ -531,6 +535,8 @@ impl ConnectionPool {
"primary" => Some(Role::Primary),
_ => unreachable!(),
},
replica_to_primary_failover_enabled: pool_config
.replica_to_primary_failover_enabled,
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
@@ -731,6 +737,19 @@ impl ConnectionPool {
});
}
// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
// we add primary address at the end of the list of candidates, this way it will be tried when
// replicas are all unavailable.
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
let mut primaries = self
.addresses
.iter()
.flatten()
.filter(|address| address.role == Role::Primary)
.collect::<Vec<&Address>>();
candidates.insert(0, primaries.pop().unwrap());
}
// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();
@@ -935,24 +954,28 @@ impl ConnectionPool {
return true;
}
// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();
// If we have replica to primary failover we should not unban replicas
// as we still have the primary to server traffic.
if !self.settings.replica_to_primary_failover_enabled {
// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();
debug!("Available targets: {}", replicas_available);
debug!("Available targets: {}", replicas_available);
let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);
let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);
if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();
if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();
return true;
return true;
}
}
// Check if ban time is expired

View File

@@ -309,6 +309,7 @@ async fn prometheus_stats(
push_pool_stats(&mut lines);
push_server_stats(&mut lines);
push_database_stats(&mut lines);
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
Response::builder()
.header("content-type", "text/plain; version=0.0.4")

View File

@@ -386,6 +386,18 @@ impl QueryRouter {
}
}
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
_ => false,
}
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting {
@@ -428,8 +440,9 @@ impl QueryRouter {
};
let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
if has_locks {
if has_locks || has_mutation {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
@@ -1048,6 +1061,11 @@ impl QueryRouter {
self.active_shard
}
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
@@ -1113,6 +1131,26 @@ mod test {
assert_eq!(qr.role(), None);
}
#[test]
fn test_split_cte_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
let query = simple_query(
"WITH t AS (
SELECT id FROM users WHERE name ILIKE '%ja%'
)
UPDATE user_languages
SET settings = '{}'
FROM t WHERE t.id = user_id;",
);
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[test]
fn test_infer_replica() {
QueryRouter::setup();
@@ -1421,6 +1459,7 @@ mod test {
load_balancing_mode: crate::config::LoadBalancingMode::Random,
shards: 2,
user: crate::config::User::default(),
replica_to_primary_failover_enabled: false,
default_role: Some(Role::Replica),
query_parser_enabled: true,
query_parser_max_length: None,
@@ -1500,6 +1539,7 @@ mod test {
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
replica_to_primary_failover_enabled: false,
query_parser_enabled: true,
query_parser_max_length: None,
query_parser_read_write_splitting: true,

View File

@@ -1,22 +1,33 @@
GEM
remote: https://rubygems.org/
specs:
activemodel (7.0.4.1)
activesupport (= 7.0.4.1)
activerecord (7.0.4.1)
activemodel (= 7.0.4.1)
activesupport (= 7.0.4.1)
activesupport (7.0.4.1)
activemodel (7.1.4)
activesupport (= 7.1.4)
activerecord (7.1.4)
activemodel (= 7.1.4)
activesupport (= 7.1.4)
timeout (>= 0.4.0)
activesupport (7.1.4)
base64
bigdecimal
concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2)
minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0)
ast (2.4.2)
concurrent-ruby (1.1.10)
base64 (0.2.0)
bigdecimal (3.1.8)
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
diff-lcs (1.5.0)
i18n (1.12.0)
drb (2.2.1)
i18n (1.14.5)
concurrent-ruby (~> 1.0)
minitest (5.17.0)
minitest (5.25.1)
mutex_m (0.2.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
@@ -52,10 +63,11 @@ GEM
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
strscan (3.1.0)
timeout (0.4.1)
toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1)
tzinfo (2.0.5)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)

View File

@@ -1,5 +1,5 @@
# frozen_string_literal: true
require_relative 'spec_helper'
require_relative "spec_helper"
describe "Random Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
@@ -8,7 +8,7 @@ describe "Random Load Balancing" do
processes.pgcat.shutdown
end
context "under regular circumstances" do
context("under regular circumstances") do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -22,14 +22,14 @@ describe "Random Load Balancing" do
failed_count += 1
end
expect(failed_count).to eq(0)
expect(failed_count).to(eq(0))
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
end
end
end
context "when some replicas are down" do
context("when some replicas are down") do
it "balances query volume between working instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
@@ -49,9 +49,9 @@ describe "Random Load Balancing" do
processes.all_databases.each do |instance|
queries_routed = instance.count_select_1_plus_2
if processes.replicas[0..1].include?(instance)
expect(queries_routed).to eq(0)
expect(queries_routed).to(eq(0))
else
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
end
end
end
@@ -65,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
processes.pgcat.shutdown
end
context "under homogeneous load" do
context("under homogeneous load") do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
@@ -79,15 +79,15 @@ describe "Least Outstanding Queries Load Balancing" do
failed_count += 1
end
expect(failed_count).to eq(0)
expect(failed_count).to(eq(0))
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
end
end
end
context "under heterogeneous load" do
xit "balances query volume between all instances based on how busy they are" do
context("under heterogeneous load") do
xit("balances query volume between all instances based on how busy they are") do
slow_query_count = 2
threads = Array.new(slow_query_count) do
Thread.new do
@@ -108,31 +108,32 @@ describe "Least Outstanding Queries Load Balancing" do
failed_count += 1
end
expect(failed_count).to eq(0)
expect(failed_count).to(eq(0))
# Under LOQ, we expect replicas running the slow pg_sleep
# to get no selects
expect(
processes.
all_databases.
map(&:count_select_1_plus_2).
count { |instance_share| instance_share == 0 }
).to eq(slow_query_count)
processes
.all_databases
.map(&:count_select_1_plus_2)
.count { |instance_share| instance_share == 0 }
)
.to(eq(slow_query_count))
# We also expect the quick queries to be spread across
# the idle servers only
processes.
all_databases.
map(&:count_select_1_plus_2).
reject { |instance_share| instance_share == 0 }.
each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
processes
.all_databases
.map(&:count_select_1_plus_2)
.reject { |instance_share| instance_share == 0 }
.each do |instance_share|
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
end
threads.map(&:join)
end
end
context "when some replicas are down" do
context("when some replicas are down") do
it "balances query volume between working instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
@@ -149,16 +150,106 @@ describe "Least Outstanding Queries Load Balancing" do
end
end
expect(failed_count).to be <= 2
expect(failed_count).to(be <= 2)
processes.all_databases.each do |instance|
queries_routed = instance.count_select_1_plus_2
if processes.replicas[0..1].include?(instance)
expect(queries_routed).to eq(0)
expect(queries_routed).to(eq(0))
else
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
end
end
end
end
end
describe "Candidate filtering based on `default_pool`" do
let(:processes) {
Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings)
}
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
context("with default_pool set to replicas") do
context("when all replicas are down ") do
let(:pool_settings) do
{
"default_role" => "replica",
"replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled
}
end
context("with `replica_to_primary_failover_enabled` set to false`") do
let(:replica_to_primary_failover_enabled) { false }
it(
"unbans them automatically to prevent false positives in health checks that could make all replicas unavailable"
) do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to(eq(number_of_replicas + 1))
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to(eq(0))
end
end
context("with `replica_to_primary_failover_enabled` set to true`") do
let(:replica_to_primary_failover_enabled) { true }
it "does not unbans them automatically" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# We need to allow pgcat to open connections to replicas
(number_of_replicas + 10).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to(eq(0))
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 10).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to(eq(number_of_replicas))
end
end
end
end
end