Compare commits

..

1 Commits

Author SHA1 Message Date
Javier Goday
f782206578 #829: read/write splitting on CTE mutable statements 2024-10-16 20:12:25 +02:00
9 changed files with 17 additions and 58 deletions

1
.gitignore vendored
View File

@@ -12,4 +12,3 @@ dev/cache
!dev/cache/.keepme
.venv
**/__pycache__
.bundle

9
Cargo.lock generated
View File

@@ -192,11 +192,12 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "bb8"
version = "0.8.6"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot",
"tokio",
@@ -1525,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.52.0"
version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [
"log",
"sqlparser_derive",

View File

@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "=0.8.6"
bb8 = "0.8.1"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.52", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers:
- name: PostgresML
email: team@postgresml.org
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.5
version: 0.2.1

View File

@@ -1,2 +1 @@
sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# default_shard = "shard_0"
# no_shard_specified_behavior = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?

View File

@@ -881,7 +881,6 @@ where
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries

View File

@@ -386,14 +386,14 @@ impl QueryRouter {
}
}
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
/// Determines if a query is mutable or not.
fn query_is_mutable_statement(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
SetExpr::Query(q) => Self::query_is_mutable_statement(q),
_ => false,
}
}
@@ -440,9 +440,9 @@ impl QueryRouter {
};
let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
let is_mutable_statement = Self::query_is_mutable_statement(query);
if has_locks || has_mutation {
if has_locks || is_mutable_statement {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
@@ -1061,11 +1061,6 @@ impl QueryRouter {
self.active_shard
}
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role

View File

@@ -56,41 +56,6 @@ describe "Random Load Balancing" do
end
end
end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end
describe "Least Outstanding Queries Load Balancing" do
@@ -196,3 +161,4 @@ describe "Least Outstanding Queries Load Balancing" do
end
end
end