mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
1 Commits
circleci_O
...
mostafa_sq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6e11e11da |
2
.github/workflows/chart-release.yaml
vendored
2
.github/workflows/chart-release.yaml
vendored
@@ -32,7 +32,7 @@ jobs:
|
|||||||
version: v3.13.0
|
version: v3.13.0
|
||||||
|
|
||||||
- name: Run chart-releaser
|
- name: Run chart-releaser
|
||||||
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
|
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
||||||
with:
|
with:
|
||||||
charts_dir: charts
|
charts_dir: charts
|
||||||
config: cr.yaml
|
config: cr.yaml
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,4 +12,3 @@ dev/cache
|
|||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
**/__pycache__
|
**/__pycache__
|
||||||
.bundle
|
|
||||||
35
CONFIG.md
35
CONFIG.md
@@ -36,11 +36,10 @@ Port at which prometheus exporter listens on.
|
|||||||
### connect_timeout
|
### connect_timeout
|
||||||
```
|
```
|
||||||
path: general.connect_timeout
|
path: general.connect_timeout
|
||||||
default: 1000 # milliseconds
|
default: 5000 # milliseconds
|
||||||
```
|
```
|
||||||
|
|
||||||
How long the client waits to obtain a server connection before aborting (ms).
|
How long to wait before aborting a server connection (ms).
|
||||||
This is similar to PgBouncer's `query_wait_timeout`.
|
|
||||||
|
|
||||||
### idle_timeout
|
### idle_timeout
|
||||||
```
|
```
|
||||||
@@ -309,15 +308,6 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
|
|||||||
`replica` round-robin between replicas only without touching the primary,
|
`replica` round-robin between replicas only without touching the primary,
|
||||||
`primary` all queries go to the primary unless otherwise specified.
|
`primary` all queries go to the primary unless otherwise specified.
|
||||||
|
|
||||||
### replica_to_primary_failover_enabled
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.replica_to_primary_failover_enabled
|
|
||||||
default: "false"
|
|
||||||
```
|
|
||||||
|
|
||||||
If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
|
|
||||||
and all replicas are banned, queries will be sent to the primary (until a replica is back online).
|
|
||||||
|
|
||||||
### prepared_statements_cache_size
|
### prepared_statements_cache_size
|
||||||
```
|
```
|
||||||
path: general.prepared_statements_cache_size
|
path: general.prepared_statements_cache_size
|
||||||
@@ -472,18 +462,10 @@ path: pools.<pool_name>.users.<user_index>.pool_size
|
|||||||
default: 9
|
default: 9
|
||||||
```
|
```
|
||||||
|
|
||||||
Maximum number of server connections that can be established for this user.
|
Maximum number of server connections that can be established for this user
|
||||||
The maximum number of connection from a single Pgcat process to any database in the cluster
|
The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||||
is the sum of pool_size across all users.
|
is the sum of pool_size across all users.
|
||||||
|
|
||||||
### min_pool_size
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.users.<user_index>.min_pool_size
|
|
||||||
default: 0
|
|
||||||
```
|
|
||||||
|
|
||||||
Minimum number of idle server connections to retain for this pool.
|
|
||||||
|
|
||||||
### statement_timeout
|
### statement_timeout
|
||||||
```
|
```
|
||||||
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
||||||
@@ -493,16 +475,6 @@ default: 0
|
|||||||
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
0 means it is disabled.
|
0 means it is disabled.
|
||||||
|
|
||||||
### connect_timeout
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.users.<user_index>.connect_timeout
|
|
||||||
default: <UNSET> # milliseconds
|
|
||||||
```
|
|
||||||
|
|
||||||
How long the client waits to obtain a server connection before aborting (ms).
|
|
||||||
This is similar to PgBouncer's `query_wait_timeout`.
|
|
||||||
If unset, uses the `connect_timeout` defined globally.
|
|
||||||
|
|
||||||
## `pools.<pool_name>.shards.<shard_index>` Section
|
## `pools.<pool_name>.shards.<shard_index>` Section
|
||||||
|
|
||||||
### servers
|
### servers
|
||||||
@@ -530,3 +502,4 @@ default: "shard0"
|
|||||||
```
|
```
|
||||||
|
|
||||||
Database name (e.g. "postgres")
|
Database name (e.g. "postgres")
|
||||||
|
|
||||||
|
|||||||
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -192,11 +192,12 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bb8"
|
name = "bb8"
|
||||||
version = "0.8.6"
|
version = "0.8.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -1525,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.41.0"
|
version = "0.51.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ edition = "2021"
|
|||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
md-5 = "0.10"
|
md-5 = "0.10"
|
||||||
bb8 = "=0.8.6"
|
bb8 = "0.8.1"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
sqlparser = { version = "0.51", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
|
|||||||
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
|
||||||
|
|
||||||
### Failover
|
### Failover
|
||||||
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
|
||||||
|
|
||||||
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
The ban time can be changed with `ban_time`. The default is 60 seconds.
|
||||||
|
|
||||||
|
|||||||
@@ -5,4 +5,4 @@ maintainers:
|
|||||||
- name: Wildcard
|
- name: Wildcard
|
||||||
email: support@w6d.io
|
email: support@w6d.io
|
||||||
appVersion: "1.2.0"
|
appVersion: "1.2.0"
|
||||||
version: 0.2.4
|
version: 0.2.0
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ stringData:
|
|||||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||||
server_tls = {{ .Values.configuration.general.server_tls }}
|
|
||||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||||
@@ -59,21 +58,11 @@ stringData:
|
|||||||
##
|
##
|
||||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||||
username = {{ $user.username | quote }}
|
username = {{ $user.username | quote }}
|
||||||
{{- if $user.password }}
|
|
||||||
password = {{ $user.password | quote }}
|
password = {{ $user.password | quote }}
|
||||||
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
|
|
||||||
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
|
|
||||||
{{- if $secret }}
|
|
||||||
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
|
|
||||||
password = {{ $password | quote }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
pool_size = {{ $user.pool_size }}
|
pool_size = {{ $user.pool_size }}
|
||||||
statement_timeout = {{ default 0 $user.statement_timeout }}
|
statement_timeout = {{ $user.statement_timeout }}
|
||||||
min_pool_size = {{ default 3 $user.min_pool_size }}
|
min_pool_size = 3
|
||||||
{{- if $user.server_lifetime }}
|
server_lifetime = 60000
|
||||||
server_lifetime = {{ $user.server_lifetime }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if and $user.server_username $user.server_password }}
|
{{- if and $user.server_username $user.server_password }}
|
||||||
server_username = {{ $user.server_username | quote }}
|
server_username = {{ $user.server_username | quote }}
|
||||||
server_password = {{ $user.server_password | quote }}
|
server_password = {{ $user.server_password | quote }}
|
||||||
|
|||||||
@@ -175,9 +175,6 @@ configuration:
|
|||||||
# Max connection lifetime before it's closed, even if actively used.
|
# Max connection lifetime before it's closed, even if actively used.
|
||||||
server_lifetime: 86400000 # 24 hours
|
server_lifetime: 86400000 # 24 hours
|
||||||
|
|
||||||
# Whether to use TLS for server connections or not.
|
|
||||||
server_tls: false
|
|
||||||
|
|
||||||
# How long a client is allowed to be idle while in a transaction (ms).
|
# How long a client is allowed to be idle while in a transaction (ms).
|
||||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||||
|
|
||||||
@@ -318,9 +315,7 @@ configuration:
|
|||||||
# ## Credentials for users that may connect to this cluster
|
# ## Credentials for users that may connect to this cluster
|
||||||
# ## @param users [array]
|
# ## @param users [array]
|
||||||
# ## @param users[0].username Name of the env var (required)
|
# ## @param users[0].username Name of the env var (required)
|
||||||
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
|
# ## @param users[0].password Value for the env var (required)
|
||||||
# ## @param users[0].passwordSecret.name Name of the secret containing the password
|
|
||||||
# ## @param users[0].passwordSecret.key Key in the secret containing the password
|
|
||||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
# users: []
|
# users: []
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ primary_reads_enabled = true
|
|||||||
# `random`: picks a shard at random
|
# `random`: picks a shard at random
|
||||||
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
||||||
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
||||||
# default_shard = "shard_0"
|
# no_shard_specified_behavior = "shard_0"
|
||||||
|
|
||||||
# So what if you wanted to implement a different hashing function,
|
# So what if you wanted to implement a different hashing function,
|
||||||
# or you've already built one and you want this pooler to use it?
|
# or you've already built one and you want this pooler to use it?
|
||||||
|
|||||||
@@ -881,7 +881,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
query_router.update_pool_settings(&pool.settings);
|
query_router.update_pool_settings(&pool.settings);
|
||||||
query_router.set_default_role();
|
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
|
|||||||
@@ -541,9 +541,6 @@ pub struct Pool {
|
|||||||
#[serde(default = "Pool::default_default_role")]
|
#[serde(default = "Pool::default_default_role")]
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
#[serde(default)] // False
|
|
||||||
pub replica_to_primary_failover_enabled: bool,
|
|
||||||
|
|
||||||
#[serde(default)] // False
|
#[serde(default)] // False
|
||||||
pub query_parser_enabled: bool,
|
pub query_parser_enabled: bool,
|
||||||
|
|
||||||
@@ -737,7 +734,6 @@ impl Default for Pool {
|
|||||||
pool_mode: Self::default_pool_mode(),
|
pool_mode: Self::default_pool_mode(),
|
||||||
load_balancing_mode: Self::default_load_balancing_mode(),
|
load_balancing_mode: Self::default_load_balancing_mode(),
|
||||||
default_role: String::from("any"),
|
default_role: String::from("any"),
|
||||||
replica_to_primary_failover_enabled: false,
|
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: false,
|
query_parser_read_write_splitting: false,
|
||||||
|
|||||||
51
src/pool.rs
51
src/pool.rs
@@ -162,9 +162,6 @@ pub struct PoolSettings {
|
|||||||
// Default server role to connect to.
|
// Default server role to connect to.
|
||||||
pub default_role: Option<Role>,
|
pub default_role: Option<Role>,
|
||||||
|
|
||||||
// Whether or not we should use primary when replicas are unavailable
|
|
||||||
pub replica_to_primary_failover_enabled: bool,
|
|
||||||
|
|
||||||
// Enable/disable query parser.
|
// Enable/disable query parser.
|
||||||
pub query_parser_enabled: bool,
|
pub query_parser_enabled: bool,
|
||||||
|
|
||||||
@@ -222,7 +219,6 @@ impl Default for PoolSettings {
|
|||||||
user: User::default(),
|
user: User::default(),
|
||||||
db: String::default(),
|
db: String::default(),
|
||||||
default_role: None,
|
default_role: None,
|
||||||
replica_to_primary_failover_enabled: false,
|
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: false,
|
query_parser_read_write_splitting: false,
|
||||||
@@ -535,8 +531,6 @@ impl ConnectionPool {
|
|||||||
"primary" => Some(Role::Primary),
|
"primary" => Some(Role::Primary),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
},
|
||||||
replica_to_primary_failover_enabled: pool_config
|
|
||||||
.replica_to_primary_failover_enabled,
|
|
||||||
query_parser_enabled: pool_config.query_parser_enabled,
|
query_parser_enabled: pool_config.query_parser_enabled,
|
||||||
query_parser_max_length: pool_config.query_parser_max_length,
|
query_parser_max_length: pool_config.query_parser_max_length,
|
||||||
query_parser_read_write_splitting: pool_config
|
query_parser_read_write_splitting: pool_config
|
||||||
@@ -737,19 +731,6 @@ impl ConnectionPool {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
|
|
||||||
// we add primary address at the end of the list of candidates, this way it will be tried when
|
|
||||||
// replicas are all unavailable.
|
|
||||||
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
|
|
||||||
let mut primaries = self
|
|
||||||
.addresses
|
|
||||||
.iter()
|
|
||||||
.flatten()
|
|
||||||
.filter(|address| address.role == Role::Primary)
|
|
||||||
.collect::<Vec<&Address>>();
|
|
||||||
candidates.insert(0, primaries.pop().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Indicate we're waiting on a server connection from a pool.
|
// Indicate we're waiting on a server connection from a pool.
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
client_stats.waiting();
|
client_stats.waiting();
|
||||||
@@ -954,28 +935,24 @@ impl ConnectionPool {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have replica to primary failover we should not unban replicas
|
// Check if all replicas are banned, in that case unban all of them
|
||||||
// as we still have the primary to server traffic.
|
let replicas_available = self.addresses[address.shard]
|
||||||
if !self.settings.replica_to_primary_failover_enabled {
|
.iter()
|
||||||
// Check if all replicas are banned, in that case unban all of them
|
.filter(|addr| addr.role == Role::Replica)
|
||||||
let replicas_available = self.addresses[address.shard]
|
.count();
|
||||||
.iter()
|
|
||||||
.filter(|addr| addr.role == Role::Replica)
|
|
||||||
.count();
|
|
||||||
|
|
||||||
debug!("Available targets: {}", replicas_available);
|
debug!("Available targets: {}", replicas_available);
|
||||||
|
|
||||||
let read_guard = self.banlist.read();
|
let read_guard = self.banlist.read();
|
||||||
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
|
||||||
drop(read_guard);
|
drop(read_guard);
|
||||||
|
|
||||||
if all_replicas_banned {
|
if all_replicas_banned {
|
||||||
let mut write_guard = self.banlist.write();
|
let mut write_guard = self.banlist.write();
|
||||||
warn!("Unbanning all replicas.");
|
warn!("Unbanning all replicas.");
|
||||||
write_guard[address.shard].clear();
|
write_guard[address.shard].clear();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if ban time is expired
|
// Check if ban time is expired
|
||||||
|
|||||||
@@ -309,7 +309,6 @@ async fn prometheus_stats(
|
|||||||
push_pool_stats(&mut lines);
|
push_pool_stats(&mut lines);
|
||||||
push_server_stats(&mut lines);
|
push_server_stats(&mut lines);
|
||||||
push_database_stats(&mut lines);
|
push_database_stats(&mut lines);
|
||||||
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
|
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.header("content-type", "text/plain; version=0.0.4")
|
.header("content-type", "text/plain; version=0.0.4")
|
||||||
|
|||||||
@@ -386,18 +386,6 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Determines if a query is a mutation or not.
|
|
||||||
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
|
|
||||||
use sqlparser::ast::*;
|
|
||||||
|
|
||||||
match q.body.as_ref() {
|
|
||||||
SetExpr::Insert(_) => true,
|
|
||||||
SetExpr::Update(_) => true,
|
|
||||||
SetExpr::Query(q) => Self::is_mutation_query(q),
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to infer which server to connect to based on the contents of the query.
|
/// Try to infer which server to connect to based on the contents of the query.
|
||||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||||
if !self.pool_settings.query_parser_read_write_splitting {
|
if !self.pool_settings.query_parser_read_write_splitting {
|
||||||
@@ -440,9 +428,8 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let has_locks = !query.locks.is_empty();
|
let has_locks = !query.locks.is_empty();
|
||||||
let has_mutation = Self::is_mutation_query(query);
|
|
||||||
|
|
||||||
if has_locks || has_mutation {
|
if has_locks {
|
||||||
self.active_role = Some(Role::Primary);
|
self.active_role = Some(Role::Primary);
|
||||||
} else if !visited_write_statement {
|
} else if !visited_write_statement {
|
||||||
// If we already visited a write statement, we should be going to the primary.
|
// If we already visited a write statement, we should be going to the primary.
|
||||||
@@ -1061,11 +1048,6 @@ impl QueryRouter {
|
|||||||
self.active_shard
|
self.active_shard
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set active_role as the default_role specified in the pool.
|
|
||||||
pub fn set_default_role(&mut self) {
|
|
||||||
self.active_role = self.pool_settings.default_role;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the current desired server role we should be talking to.
|
/// Get the current desired server role we should be talking to.
|
||||||
pub fn role(&self) -> Option<Role> {
|
pub fn role(&self) -> Option<Role> {
|
||||||
self.active_role
|
self.active_role
|
||||||
@@ -1131,26 +1113,6 @@ mod test {
|
|||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_split_cte_queries() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
|
||||||
qr.pool_settings.query_parser_enabled = true;
|
|
||||||
|
|
||||||
let query = simple_query(
|
|
||||||
"WITH t AS (
|
|
||||||
SELECT id FROM users WHERE name ILIKE '%ja%'
|
|
||||||
)
|
|
||||||
UPDATE user_languages
|
|
||||||
SET settings = '{}'
|
|
||||||
FROM t WHERE t.id = user_id;",
|
|
||||||
);
|
|
||||||
let ast = qr.parse(&query).unwrap();
|
|
||||||
assert!(qr.infer(&ast).is_ok());
|
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_infer_replica() {
|
fn test_infer_replica() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
@@ -1459,7 +1421,6 @@ mod test {
|
|||||||
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
load_balancing_mode: crate::config::LoadBalancingMode::Random,
|
||||||
shards: 2,
|
shards: 2,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
replica_to_primary_failover_enabled: false,
|
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
query_parser_enabled: true,
|
query_parser_enabled: true,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
@@ -1539,7 +1500,6 @@ mod test {
|
|||||||
shards: 5,
|
shards: 5,
|
||||||
user: crate::config::User::default(),
|
user: crate::config::User::default(),
|
||||||
default_role: Some(Role::Replica),
|
default_role: Some(Role::Replica),
|
||||||
replica_to_primary_failover_enabled: false,
|
|
||||||
query_parser_enabled: true,
|
query_parser_enabled: true,
|
||||||
query_parser_max_length: None,
|
query_parser_max_length: None,
|
||||||
query_parser_read_write_splitting: true,
|
query_parser_read_write_splitting: true,
|
||||||
|
|||||||
@@ -1,33 +1,22 @@
|
|||||||
GEM
|
GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
activemodel (7.1.4)
|
activemodel (7.0.4.1)
|
||||||
activesupport (= 7.1.4)
|
activesupport (= 7.0.4.1)
|
||||||
activerecord (7.1.4)
|
activerecord (7.0.4.1)
|
||||||
activemodel (= 7.1.4)
|
activemodel (= 7.0.4.1)
|
||||||
activesupport (= 7.1.4)
|
activesupport (= 7.0.4.1)
|
||||||
timeout (>= 0.4.0)
|
activesupport (7.0.4.1)
|
||||||
activesupport (7.1.4)
|
|
||||||
base64
|
|
||||||
bigdecimal
|
|
||||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||||
connection_pool (>= 2.2.5)
|
|
||||||
drb
|
|
||||||
i18n (>= 1.6, < 2)
|
i18n (>= 1.6, < 2)
|
||||||
minitest (>= 5.1)
|
minitest (>= 5.1)
|
||||||
mutex_m
|
|
||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
base64 (0.2.0)
|
concurrent-ruby (1.1.10)
|
||||||
bigdecimal (3.1.8)
|
|
||||||
concurrent-ruby (1.3.4)
|
|
||||||
connection_pool (2.4.1)
|
|
||||||
diff-lcs (1.5.0)
|
diff-lcs (1.5.0)
|
||||||
drb (2.2.1)
|
i18n (1.12.0)
|
||||||
i18n (1.14.5)
|
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
minitest (5.25.1)
|
minitest (5.17.0)
|
||||||
mutex_m (0.2.0)
|
|
||||||
parallel (1.22.1)
|
parallel (1.22.1)
|
||||||
parser (3.1.2.0)
|
parser (3.1.2.0)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
@@ -63,11 +52,10 @@ GEM
|
|||||||
parser (>= 3.1.1.0)
|
parser (>= 3.1.1.0)
|
||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
strscan (3.1.0)
|
strscan (3.1.0)
|
||||||
timeout (0.4.1)
|
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
toxiproxy (2.0.1)
|
||||||
tzinfo (2.0.6)
|
tzinfo (2.0.5)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.1.0)
|
unicode-display_width (2.1.0)
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require_relative "spec_helper"
|
require_relative 'spec_helper'
|
||||||
|
|
||||||
describe "Random Load Balancing" do
|
describe "Random Load Balancing" do
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||||
@@ -8,7 +8,7 @@ describe "Random Load Balancing" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
context("under regular circumstances") do
|
context "under regular circumstances" do
|
||||||
it "balances query volume between all instances" do
|
it "balances query volume between all instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|
||||||
@@ -22,14 +22,14 @@ describe "Random Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to(eq(0))
|
expect(failed_count).to eq(0)
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context("when some replicas are down") do
|
context "when some replicas are down" do
|
||||||
it "balances query volume between working instances" do
|
it "balances query volume between working instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||||
@@ -49,9 +49,9 @@ describe "Random Load Balancing" do
|
|||||||
processes.all_databases.each do |instance|
|
processes.all_databases.each do |instance|
|
||||||
queries_routed = instance.count_select_1_plus_2
|
queries_routed = instance.count_select_1_plus_2
|
||||||
if processes.replicas[0..1].include?(instance)
|
if processes.replicas[0..1].include?(instance)
|
||||||
expect(queries_routed).to(eq(0))
|
expect(queries_routed).to eq(0)
|
||||||
else
|
else
|
||||||
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -65,7 +65,7 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
context("under homogeneous load") do
|
context "under homogeneous load" do
|
||||||
it "balances query volume between all instances" do
|
it "balances query volume between all instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|
||||||
@@ -79,15 +79,15 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to(eq(0))
|
expect(failed_count).to eq(0)
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context("under heterogeneous load") do
|
context "under heterogeneous load" do
|
||||||
xit("balances query volume between all instances based on how busy they are") do
|
xit "balances query volume between all instances based on how busy they are" do
|
||||||
slow_query_count = 2
|
slow_query_count = 2
|
||||||
threads = Array.new(slow_query_count) do
|
threads = Array.new(slow_query_count) do
|
||||||
Thread.new do
|
Thread.new do
|
||||||
@@ -108,32 +108,31 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
failed_count += 1
|
failed_count += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to(eq(0))
|
expect(failed_count).to eq(0)
|
||||||
# Under LOQ, we expect replicas running the slow pg_sleep
|
# Under LOQ, we expect replicas running the slow pg_sleep
|
||||||
# to get no selects
|
# to get no selects
|
||||||
expect(
|
expect(
|
||||||
processes
|
processes.
|
||||||
.all_databases
|
all_databases.
|
||||||
.map(&:count_select_1_plus_2)
|
map(&:count_select_1_plus_2).
|
||||||
.count { |instance_share| instance_share == 0 }
|
count { |instance_share| instance_share == 0 }
|
||||||
)
|
).to eq(slow_query_count)
|
||||||
.to(eq(slow_query_count))
|
|
||||||
|
|
||||||
# We also expect the quick queries to be spread across
|
# We also expect the quick queries to be spread across
|
||||||
# the idle servers only
|
# the idle servers only
|
||||||
processes
|
processes.
|
||||||
.all_databases
|
all_databases.
|
||||||
.map(&:count_select_1_plus_2)
|
map(&:count_select_1_plus_2).
|
||||||
.reject { |instance_share| instance_share == 0 }
|
reject { |instance_share| instance_share == 0 }.
|
||||||
.each do |instance_share|
|
each do |instance_share|
|
||||||
expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
end
|
end
|
||||||
|
|
||||||
threads.map(&:join)
|
threads.map(&:join)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context("when some replicas are down") do
|
context "when some replicas are down" do
|
||||||
it "balances query volume between working instances" do
|
it "balances query volume between working instances" do
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||||
@@ -150,106 +149,16 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(failed_count).to(be <= 2)
|
expect(failed_count).to be <= 2
|
||||||
processes.all_databases.each do |instance|
|
processes.all_databases.each do |instance|
|
||||||
queries_routed = instance.count_select_1_plus_2
|
queries_routed = instance.count_select_1_plus_2
|
||||||
if processes.replicas[0..1].include?(instance)
|
if processes.replicas[0..1].include?(instance)
|
||||||
expect(queries_routed).to(eq(0))
|
expect(queries_routed).to eq(0)
|
||||||
else
|
else
|
||||||
expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share))
|
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Candidate filtering based on `default_pool`" do
|
|
||||||
let(:processes) {
|
|
||||||
Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings)
|
|
||||||
}
|
|
||||||
|
|
||||||
after do
|
|
||||||
processes.all_databases.map(&:reset)
|
|
||||||
processes.pgcat.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
context("with default_pool set to replicas") do
|
|
||||||
context("when all replicas are down ") do
|
|
||||||
let(:pool_settings) do
|
|
||||||
{
|
|
||||||
"default_role" => "replica",
|
|
||||||
"replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
context("with `replica_to_primary_failover_enabled` set to false`") do
|
|
||||||
let(:replica_to_primary_failover_enabled) { false }
|
|
||||||
|
|
||||||
it(
|
|
||||||
"unbans them automatically to prevent false positives in health checks that could make all replicas unavailable"
|
|
||||||
) do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count = 0
|
|
||||||
number_of_replicas = processes[:replicas].length
|
|
||||||
|
|
||||||
# Take down all replicas
|
|
||||||
processes[:replicas].each(&:take_down)
|
|
||||||
|
|
||||||
(number_of_replicas + 1).times do |n|
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to(eq(number_of_replicas + 1))
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
# Ban_time is configured to 60 so this reset will only work
|
|
||||||
# if the replicas are unbanned automatically
|
|
||||||
processes[:replicas].each(&:reset)
|
|
||||||
|
|
||||||
number_of_replicas.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to(eq(0))
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context("with `replica_to_primary_failover_enabled` set to true`") do
|
|
||||||
let(:replica_to_primary_failover_enabled) { true }
|
|
||||||
|
|
||||||
it "does not unbans them automatically" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count = 0
|
|
||||||
number_of_replicas = processes[:replicas].length
|
|
||||||
|
|
||||||
# We need to allow pgcat to open connections to replicas
|
|
||||||
(number_of_replicas + 10).times do |n|
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
expect(failed_count).to(eq(0))
|
|
||||||
|
|
||||||
# Take down all replicas
|
|
||||||
processes[:replicas].each(&:take_down)
|
|
||||||
|
|
||||||
(number_of_replicas + 10).times do |n|
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to(eq(number_of_replicas))
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|||||||
Reference in New Issue
Block a user