Compare commits

..

2 Commits

Author SHA1 Message Date
Vita Tauer
c0ae53484f Fixed linter errors 2024-11-11 12:09:19 +01:00
Vita Tauer
a1690a76c5 Add 'Fail' mode when no shard selected 2024-11-11 12:04:15 +01:00
8 changed files with 47 additions and 25 deletions

4
Cargo.lock generated
View File

@@ -1525,9 +1525,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlparser" name = "sqlparser"
version = "0.52.0" version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08" checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [ dependencies = [
"log", "log",
"sqlparser_derive", "sqlparser_derive",

View File

@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
sqlparser = { version = "0.52", features = ["visitor"] } sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
parking_lot = "0.12.1" parking_lot = "0.12.1"

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring. description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers: maintainers:
- name: PostgresML - name: Wildcard
email: team@postgresml.org email: support@w6d.io
appVersion: "1.2.0" appVersion: "1.2.0"
version: 0.2.5 version: 0.2.4

View File

@@ -179,6 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random # `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors # `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime # `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# `fail`: fails to pick up shard. (require explicit shard setup)
# default_shard = "shard_0" # default_shard = "shard_0"
# So what if you wanted to implement a different hashing function, # So what if you wanted to implement a different hashing function,

View File

@@ -773,6 +773,7 @@ pub enum DefaultShard {
Shard(usize), Shard(usize),
Random, Random,
RandomHealthy, RandomHealthy,
Fail,
} }
impl Default for DefaultShard { impl Default for DefaultShard {
fn default() -> Self { fn default() -> Self {
@@ -787,6 +788,7 @@ impl serde::Serialize for DefaultShard {
} }
DefaultShard::Random => serializer.serialize_str("random"), DefaultShard::Random => serializer.serialize_str("random"),
DefaultShard::RandomHealthy => serializer.serialize_str("random_healthy"), DefaultShard::RandomHealthy => serializer.serialize_str("random_healthy"),
DefaultShard::Fail => serializer.serialize_str("fail"),
} }
} }
} }
@@ -804,6 +806,7 @@ impl<'de> serde::Deserialize<'de> for DefaultShard {
match s.as_str() { match s.as_str() {
"random" => Ok(DefaultShard::Random), "random" => Ok(DefaultShard::Random),
"random_healthy" => Ok(DefaultShard::RandomHealthy), "random_healthy" => Ok(DefaultShard::RandomHealthy),
"fail" => Ok(DefaultShard::Fail),
_ => Err(serde::de::Error::custom( _ => Err(serde::de::Error::custom(
"invalid value for no_shard_specified_behavior", "invalid value for no_shard_specified_behavior",
)), )),

View File

@@ -30,6 +30,7 @@ pub enum Error {
QueryRouterError(String), QueryRouterError(String),
InvalidShardId(usize), InvalidShardId(usize),
PreparedStatementError, PreparedStatementError,
NoShardSelected,
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]

View File

@@ -720,6 +720,7 @@ impl ConnectionPool {
.unwrap() .unwrap()
}); });
} }
DefaultShard::Fail => return Err(Error::NoShardSelected),
}, },
}; };

View File

@@ -504,33 +504,55 @@ impl QueryRouter {
let mut table_names = Vec::new(); let mut table_names = Vec::new();
match q { match q {
Insert(i) => { Insert {
or,
into: _,
table_name,
columns,
overwrite: _,
source,
partitioned,
after_columns,
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres. // Not supported in postgres.
assert!(i.or.is_none()); assert!(or.is_none());
assert!(i.partitioned.is_none()); assert!(partitioned.is_none());
assert!(i.after_columns.is_empty()); assert!(after_columns.is_empty());
Self::process_table(&i.table_name, &mut table_names); Self::process_table(table_name, &mut table_names);
if let Some(source) = &i.source { if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns)); Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
} }
} }
Delete(d) => { Delete {
if let Some(expr) = &d.selection { tables,
from,
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone()); exprs.push(expr.clone());
} }
// Multi tables delete are not supported in postgres. // Multi tables delete are not supported in postgres.
assert!(d.tables.is_empty()); assert!(tables.is_empty());
if let Some(using_tbl_with_join) = &d.using { Self::process_tables_with_join(from, &mut exprs, &mut table_names);
if let Some(using_tbl_with_join) = using {
Self::process_tables_with_join( Self::process_tables_with_join(
using_tbl_with_join, using_tbl_with_join,
&mut exprs, &mut exprs,
&mut table_names, &mut table_names,
); );
} }
Self::process_selection(&d.selection, &mut exprs); Self::process_selection(selection, &mut exprs);
} }
Update { Update {
table, table,
@@ -800,13 +822,7 @@ impl QueryRouter {
for a in assignments { for a in assignments {
if sharding_key[0].value == "*" if sharding_key[0].value == "*"
&& sharding_key[1].value && sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
== a.target
.to_string()
.split('.')
.last()
.unwrap()
.to_lowercase()
{ {
return Err(Error::QueryRouterParserError( return Err(Error::QueryRouterParserError(
"Sharding key cannot be updated.".into(), "Sharding key cannot be updated.".into(),