Compare commits

..

12 Commits

Author SHA1 Message Date
Gabriel Simmer
2f8eb58bdb Update to latest sqlparser version 2024-11-17 00:50:38 +00:00
Mostafa
328c703e4a bump 2024-11-12 18:54:25 -06:00
dependabot[bot]
da044e8fa5 chore(deps): bump sqlparser from 0.41.0 to 0.52.0
Bumps [sqlparser](https://github.com/apache/datafusion-sqlparser-rs) from 0.41.0 to 0.52.0.
- [Changelog](https://github.com/apache/datafusion-sqlparser-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/apache/datafusion-sqlparser-rs/commits)

---
updated-dependencies:
- dependency-name: sqlparser
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-11-12 04:16:48 +00:00
Mostafa
3796e26402 Fix contact info for Helm chart (#861)
* Fix contact info for Helm chart
2024-11-11 09:24:17 -06:00
Mostafa
0ee59c0c40 Another no-op helm release (#853) 2024-11-08 06:07:12 -06:00
Mostafa
b61d2cc6f0 Use main branch for helm chart releases (#852) 2024-11-08 06:04:42 -06:00
Jose Fernández
c11418c083 Revert "Do not unban replicas if a primary is available" (#850)
Revert "Do not unban replicas if a primary is available (#843)"

This reverts commit cdcfa99fb9.
2024-11-07 22:00:43 +01:00
Jose Fernández
c9544bdff2 Fix default_role being ignored when query_parser_enabled was false (#847)
Fix default_role being ignored when query_parser_enabled was false
2024-11-07 11:11:49 -06:00
Jose Fernández
cdcfa99fb9 Do not unban replicas if a primary is available (#843)
Add `unban_replicas_when_all_banned` to control unbanning replicas behavior.
2024-11-07 11:11:11 -06:00
Víťa Tauer
f27dc6b483 Fixing invalid setting name in pgcat.toml (#849) 2024-11-07 06:17:09 -06:00
Mostafa
326efc22b3 Another no-op release for helm (#845)
Another no-op release
2024-11-02 18:05:41 -05:00
Mostafa
01c6afb2e5 Attempt a helm chart release (#844)
Attempt a release

Co-authored-by: Mostafa <no_reply@github.com>
2024-11-02 11:55:18 -05:00
12 changed files with 70 additions and 69 deletions

View File

@@ -130,16 +130,6 @@ default: 60 # seconds
How long to ban a server if it fails a health check (seconds).
### unban_replicas_when_all_banned
```
path: general.unban_replicas_when_all_banned
default: true
```
Whether or not we should unban all replicas when they are all banned. This is set
to true by default to prevent disconnection when we have replicas with a false positive
health check.
### log_client_connections
```
path: general.log_client_connections

4
Cargo.lock generated
View File

@@ -1525,9 +1525,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.41.0"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
dependencies = [
"log",
"sqlparser_derive",

View File

@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.41", features = ["visitor"] }
sqlparser = { version = "0.52", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"

View File

@@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.
### Failover
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the behavior is controlled by the configuration parameter `unban_replicas_when_all_banned`. If it is set to true (the default), the ban list is cleared: this is a safety precaution against false positives, if it is set to false, no replicas will be available until they become healthy. The primary can never be banned.
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
The ban time can be changed with `ban_time`. The default is 60 seconds.

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers:
- name: Wildcard
email: support@w6d.io
- name: PostgresML
email: team@postgresml.org
appVersion: "1.2.0"
version: 0.2.1
version: 0.2.5

View File

@@ -1 +1,2 @@
sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# no_shard_specified_behavior = "shard_0"
# default_shard = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?

View File

@@ -881,6 +881,7 @@ where
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries

View File

@@ -315,9 +315,6 @@ pub struct General {
#[serde(default = "General::default_ban_time")]
pub ban_time: i64,
#[serde(default)] // True
pub unban_replicas_when_all_banned: bool,
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
pub idle_client_in_transaction_timeout: u64,
@@ -463,7 +460,6 @@ impl Default for General {
healthcheck_timeout: Self::default_healthcheck_timeout(),
healthcheck_delay: Self::default_healthcheck_delay(),
ban_time: Self::default_ban_time(),
unban_replicas_when_all_banned: true,
idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(),
server_lifetime: Self::default_server_lifetime(),
server_round_robin: Self::default_server_round_robin(),

View File

@@ -189,9 +189,6 @@ pub struct PoolSettings {
// Ban time
pub ban_time: i64,
// Should we automatically unban replicas when all are banned?
pub unban_replicas_when_all_banned: bool,
// Regex for searching for the sharding key in SQL statements
pub sharding_key_regex: Option<Regex>,
@@ -231,7 +228,6 @@ impl Default for PoolSettings {
healthcheck_delay: General::default_healthcheck_delay(),
healthcheck_timeout: General::default_healthcheck_timeout(),
ban_time: General::default_ban_time(),
unban_replicas_when_all_banned: true,
sharding_key_regex: None,
shard_id_regex: None,
regex_search_limit: 1000,
@@ -545,9 +541,6 @@ impl ConnectionPool {
healthcheck_delay: config.general.healthcheck_delay,
healthcheck_timeout: config.general.healthcheck_timeout,
ban_time: config.general.ban_time,
unban_replicas_when_all_banned: config
.general
.unban_replicas_when_all_banned,
sharding_key_regex: pool_config
.sharding_key_regex
.clone()
@@ -953,9 +946,8 @@ impl ConnectionPool {
let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);
let unban_replicas_when_all_banned = self.settings.clone().unban_replicas_when_all_banned;
if all_replicas_banned && unban_replicas_when_all_banned {
if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();

View File

@@ -504,55 +504,33 @@ impl QueryRouter {
let mut table_names = Vec::new();
match q {
Insert {
or,
into: _,
table_name,
columns,
overwrite: _,
source,
partitioned,
after_columns,
table: _,
on: _,
returning: _,
ignore: _,
} => {
Insert(i) => {
// Not supported in postgres.
assert!(or.is_none());
assert!(partitioned.is_none());
assert!(after_columns.is_empty());
assert!(i.or.is_none());
assert!(i.partitioned.is_none());
assert!(i.after_columns.is_empty());
Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
Self::process_table(&i.table_name, &mut table_names);
if let Some(source) = &i.source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
}
}
Delete {
tables,
from,
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
Delete(d) => {
if let Some(expr) = &d.selection {
exprs.push(expr.clone());
}
// Multi tables delete are not supported in postgres.
assert!(tables.is_empty());
assert!(d.tables.is_empty());
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
if let Some(using_tbl_with_join) = using {
if let Some(using_tbl_with_join) = &d.using {
Self::process_tables_with_join(
using_tbl_with_join,
&mut exprs,
&mut table_names,
);
}
Self::process_selection(selection, &mut exprs);
Self::process_selection(&d.selection, &mut exprs);
}
Update {
table,
@@ -822,7 +800,13 @@ impl QueryRouter {
for a in assignments {
if sharding_key[0].value == "*"
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
&& sharding_key[1].value
== a.target
.to_string()
.split('.')
.last()
.unwrap()
.to_lowercase()
{
return Err(Error::QueryRouterParserError(
"Sharding key cannot be updated.".into(),
@@ -1061,6 +1045,11 @@ impl QueryRouter {
self.active_shard
}
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
@@ -1464,7 +1453,6 @@ mod test {
healthcheck_delay: PoolSettings::default().healthcheck_delay,
healthcheck_timeout: PoolSettings::default().healthcheck_timeout,
ban_time: PoolSettings::default().ban_time,
unban_replicas_when_all_banned: true,
sharding_key_regex: None,
shard_id_regex: None,
default_shard: crate::config::DefaultShard::Shard(0),
@@ -1543,7 +1531,6 @@ mod test {
healthcheck_delay: PoolSettings::default().healthcheck_delay,
healthcheck_timeout: PoolSettings::default().healthcheck_timeout,
ban_time: PoolSettings::default().ban_time,
unban_replicas_when_all_banned: true,
sharding_key_regex: Some(Regex::new(r"/\* sharding_key: (\d+) \*/").unwrap()),
shard_id_regex: Some(Regex::new(r"/\* shard_id: (\d+) \*/").unwrap()),
default_shard: crate::config::DefaultShard::Shard(0),

View File

@@ -56,6 +56,41 @@ describe "Random Load Balancing" do
end
end
end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end
describe "Least Outstanding Queries Load Balancing" do
@@ -161,4 +196,3 @@ describe "Least Outstanding Queries Load Balancing" do
end
end
end