mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-24 01:36:29 +00:00
Compare commits
12 Commits
circleci_O
...
circleci_g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f8eb58bdb | ||
|
|
328c703e4a | ||
|
|
da044e8fa5 | ||
|
|
3796e26402 | ||
|
|
0ee59c0c40 | ||
|
|
b61d2cc6f0 | ||
|
|
c11418c083 | ||
|
|
c9544bdff2 | ||
|
|
cdcfa99fb9 | ||
|
|
f27dc6b483 | ||
|
|
326efc22b3 | ||
|
|
01c6afb2e5 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1525,9 +1525,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.41.0"
|
version = "0.52.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
sqlparser = { version = "0.52", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ apiVersion: v2
|
|||||||
name: pgcat
|
name: pgcat
|
||||||
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||||
maintainers:
|
maintainers:
|
||||||
- name: Wildcard
|
- name: PostgresML
|
||||||
email: support@w6d.io
|
email: team@postgresml.org
|
||||||
appVersion: "1.2.0"
|
appVersion: "1.2.0"
|
||||||
version: 0.2.1
|
version: 0.2.5
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ primary_reads_enabled = true
|
|||||||
# `random`: picks a shard at random
|
# `random`: picks a shard at random
|
||||||
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
||||||
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
||||||
# no_shard_specified_behavior = "shard_0"
|
# default_shard = "shard_0"
|
||||||
|
|
||||||
# So what if you wanted to implement a different hashing function,
|
# So what if you wanted to implement a different hashing function,
|
||||||
# or you've already built one and you want this pooler to use it?
|
# or you've already built one and you want this pooler to use it?
|
||||||
|
|||||||
@@ -881,6 +881,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
query_router.update_pool_settings(&pool.settings);
|
query_router.update_pool_settings(&pool.settings);
|
||||||
|
query_router.set_default_role();
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
|
|||||||
@@ -504,55 +504,33 @@ impl QueryRouter {
|
|||||||
let mut table_names = Vec::new();
|
let mut table_names = Vec::new();
|
||||||
|
|
||||||
match q {
|
match q {
|
||||||
Insert {
|
Insert(i) => {
|
||||||
or,
|
|
||||||
into: _,
|
|
||||||
table_name,
|
|
||||||
columns,
|
|
||||||
overwrite: _,
|
|
||||||
source,
|
|
||||||
partitioned,
|
|
||||||
after_columns,
|
|
||||||
table: _,
|
|
||||||
on: _,
|
|
||||||
returning: _,
|
|
||||||
ignore: _,
|
|
||||||
} => {
|
|
||||||
// Not supported in postgres.
|
// Not supported in postgres.
|
||||||
assert!(or.is_none());
|
assert!(i.or.is_none());
|
||||||
assert!(partitioned.is_none());
|
assert!(i.partitioned.is_none());
|
||||||
assert!(after_columns.is_empty());
|
assert!(i.after_columns.is_empty());
|
||||||
|
|
||||||
Self::process_table(table_name, &mut table_names);
|
Self::process_table(&i.table_name, &mut table_names);
|
||||||
if let Some(source) = source {
|
if let Some(source) = &i.source {
|
||||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Delete {
|
Delete(d) => {
|
||||||
tables,
|
if let Some(expr) = &d.selection {
|
||||||
from,
|
|
||||||
using,
|
|
||||||
selection,
|
|
||||||
returning: _,
|
|
||||||
order_by: _,
|
|
||||||
limit: _,
|
|
||||||
} => {
|
|
||||||
if let Some(expr) = selection {
|
|
||||||
exprs.push(expr.clone());
|
exprs.push(expr.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Multi tables delete are not supported in postgres.
|
// Multi tables delete are not supported in postgres.
|
||||||
assert!(tables.is_empty());
|
assert!(d.tables.is_empty());
|
||||||
|
|
||||||
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
|
if let Some(using_tbl_with_join) = &d.using {
|
||||||
if let Some(using_tbl_with_join) = using {
|
|
||||||
Self::process_tables_with_join(
|
Self::process_tables_with_join(
|
||||||
using_tbl_with_join,
|
using_tbl_with_join,
|
||||||
&mut exprs,
|
&mut exprs,
|
||||||
&mut table_names,
|
&mut table_names,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Self::process_selection(selection, &mut exprs);
|
Self::process_selection(&d.selection, &mut exprs);
|
||||||
}
|
}
|
||||||
Update {
|
Update {
|
||||||
table,
|
table,
|
||||||
@@ -822,7 +800,13 @@ impl QueryRouter {
|
|||||||
|
|
||||||
for a in assignments {
|
for a in assignments {
|
||||||
if sharding_key[0].value == "*"
|
if sharding_key[0].value == "*"
|
||||||
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
|
&& sharding_key[1].value
|
||||||
|
== a.target
|
||||||
|
.to_string()
|
||||||
|
.split('.')
|
||||||
|
.last()
|
||||||
|
.unwrap()
|
||||||
|
.to_lowercase()
|
||||||
{
|
{
|
||||||
return Err(Error::QueryRouterParserError(
|
return Err(Error::QueryRouterParserError(
|
||||||
"Sharding key cannot be updated.".into(),
|
"Sharding key cannot be updated.".into(),
|
||||||
@@ -1061,6 +1045,11 @@ impl QueryRouter {
|
|||||||
self.active_shard
|
self.active_shard
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set active_role as the default_role specified in the pool.
|
||||||
|
pub fn set_default_role(&mut self) {
|
||||||
|
self.active_role = self.pool_settings.default_role;
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the current desired server role we should be talking to.
|
/// Get the current desired server role we should be talking to.
|
||||||
pub fn role(&self) -> Option<Role> {
|
pub fn role(&self) -> Option<Role> {
|
||||||
self.active_role
|
self.active_role
|
||||||
|
|||||||
@@ -56,6 +56,41 @@ describe "Random Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "when all replicas are down " do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
|
||||||
|
|
||||||
|
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count = 0
|
||||||
|
number_of_replicas = processes[:replicas].length
|
||||||
|
|
||||||
|
# Take down all replicas
|
||||||
|
processes[:replicas].each(&:take_down)
|
||||||
|
|
||||||
|
(number_of_replicas + 1).times do |n|
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(number_of_replicas + 1)
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
# Ban_time is configured to 60 so this reset will only work
|
||||||
|
# if the replicas are unbanned automatically
|
||||||
|
processes[:replicas].each(&:reset)
|
||||||
|
|
||||||
|
number_of_replicas.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
expect(failed_count).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Least Outstanding Queries Load Balancing" do
|
describe "Least Outstanding Queries Load Balancing" do
|
||||||
@@ -161,4 +196,3 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user