mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
3 Commits
circleci_g
...
circleci_A
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fba40eba2f | ||
|
|
d8ccf4babb | ||
|
|
feedcd49d9 |
4
.github/workflows/chart-lint-test.yaml
vendored
4
.github/workflows/chart-lint-test.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.1.0
|
||||
uses: actions/setup-python@v4.1.0
|
||||
with:
|
||||
python-version: 3.7
|
||||
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
run: ct lint --config ct.yaml
|
||||
|
||||
- name: Create kind cluster
|
||||
uses: helm/kind-action@v1.10.0
|
||||
uses: helm/kind-action@v1.7.0
|
||||
if: steps.list-changed.outputs.changed == 'true'
|
||||
|
||||
- name: Run chart-testing (install)
|
||||
|
||||
2
.github/workflows/chart-release.yaml
vendored
2
.github/workflows/chart-release.yaml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
version: v3.13.0
|
||||
|
||||
- name: Run chart-releaser
|
||||
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
|
||||
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
||||
with:
|
||||
charts_dir: charts
|
||||
config: cr.yaml
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,4 +12,3 @@ dev/cache
|
||||
!dev/cache/.keepme
|
||||
.venv
|
||||
**/__pycache__
|
||||
.bundle
|
||||
26
CONFIG.md
26
CONFIG.md
@@ -36,11 +36,10 @@ Port at which prometheus exporter listens on.
|
||||
### connect_timeout
|
||||
```
|
||||
path: general.connect_timeout
|
||||
default: 1000 # milliseconds
|
||||
default: 5000 # milliseconds
|
||||
```
|
||||
|
||||
How long the client waits to obtain a server connection before aborting (ms).
|
||||
This is similar to PgBouncer's `query_wait_timeout`.
|
||||
How long to wait before aborting a server connection (ms).
|
||||
|
||||
### idle_timeout
|
||||
```
|
||||
@@ -463,18 +462,10 @@ path: pools.<pool_name>.users.<user_index>.pool_size
|
||||
default: 9
|
||||
```
|
||||
|
||||
Maximum number of server connections that can be established for this user.
|
||||
Maximum number of server connections that can be established for this user
|
||||
The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
is the sum of pool_size across all users.
|
||||
|
||||
### min_pool_size
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.min_pool_size
|
||||
default: 0
|
||||
```
|
||||
|
||||
Minimum number of idle server connections to retain for this pool.
|
||||
|
||||
### statement_timeout
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
||||
@@ -484,16 +475,6 @@ default: 0
|
||||
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
0 means it is disabled.
|
||||
|
||||
### connect_timeout
|
||||
```
|
||||
path: pools.<pool_name>.users.<user_index>.connect_timeout
|
||||
default: <UNSET> # milliseconds
|
||||
```
|
||||
|
||||
How long the client waits to obtain a server connection before aborting (ms).
|
||||
This is similar to PgBouncer's `query_wait_timeout`.
|
||||
If unset, uses the `connect_timeout` defined globally.
|
||||
|
||||
## `pools.<pool_name>.shards.<shard_index>` Section
|
||||
|
||||
### servers
|
||||
@@ -521,3 +502,4 @@ default: "shard0"
|
||||
```
|
||||
|
||||
Database name (e.g. "postgres")
|
||||
|
||||
|
||||
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -192,11 +192,12 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
|
||||
|
||||
[[package]]
|
||||
name = "bb8"
|
||||
version = "0.8.6"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
||||
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
@@ -1525,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.52.0"
|
||||
version = "0.41.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a875d8cd437cc8a97e9aeaeea352ec9a19aea99c23e9effb17757291de80b08"
|
||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||
dependencies = [
|
||||
"log",
|
||||
"sqlparser_derive",
|
||||
|
||||
@@ -8,7 +8,7 @@ edition = "2021"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
bytes = "1"
|
||||
md-5 = "0.10"
|
||||
bb8 = "=0.8.6"
|
||||
bb8 = "0.8.1"
|
||||
async-trait = "0.1"
|
||||
rand = "0.8"
|
||||
chrono = "0.4"
|
||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = { version = "0.52", features = ["visitor"] }
|
||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
@@ -2,7 +2,7 @@ apiVersion: v2
|
||||
name: pgcat
|
||||
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
|
||||
maintainers:
|
||||
- name: PostgresML
|
||||
email: team@postgresml.org
|
||||
- name: Wildcard
|
||||
email: support@w6d.io
|
||||
appVersion: "1.2.0"
|
||||
version: 0.2.5
|
||||
version: 0.2.0
|
||||
|
||||
@@ -15,7 +15,6 @@ stringData:
|
||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||
server_tls = {{ .Values.configuration.general.server_tls }}
|
||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||
@@ -59,21 +58,11 @@ stringData:
|
||||
##
|
||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||
username = {{ $user.username | quote }}
|
||||
{{- if $user.password }}
|
||||
password = {{ $user.password | quote }}
|
||||
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
|
||||
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
|
||||
{{- if $secret }}
|
||||
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
|
||||
password = {{ $password | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
pool_size = {{ $user.pool_size }}
|
||||
statement_timeout = {{ default 0 $user.statement_timeout }}
|
||||
min_pool_size = {{ default 3 $user.min_pool_size }}
|
||||
{{- if $user.server_lifetime }}
|
||||
server_lifetime = {{ $user.server_lifetime }}
|
||||
{{- end }}
|
||||
statement_timeout = {{ $user.statement_timeout }}
|
||||
min_pool_size = 3
|
||||
server_lifetime = 60000
|
||||
{{- if and $user.server_username $user.server_password }}
|
||||
server_username = {{ $user.server_username | quote }}
|
||||
server_password = {{ $user.server_password | quote }}
|
||||
|
||||
@@ -175,9 +175,6 @@ configuration:
|
||||
# Max connection lifetime before it's closed, even if actively used.
|
||||
server_lifetime: 86400000 # 24 hours
|
||||
|
||||
# Whether to use TLS for server connections or not.
|
||||
server_tls: false
|
||||
|
||||
# How long a client is allowed to be idle while in a transaction (ms).
|
||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||
|
||||
@@ -318,9 +315,7 @@ configuration:
|
||||
# ## Credentials for users that may connect to this cluster
|
||||
# ## @param users [array]
|
||||
# ## @param users[0].username Name of the env var (required)
|
||||
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
|
||||
# ## @param users[0].passwordSecret.name Name of the secret containing the password
|
||||
# ## @param users[0].passwordSecret.key Key in the secret containing the password
|
||||
# ## @param users[0].password Value for the env var (required)
|
||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
# users: []
|
||||
|
||||
@@ -179,7 +179,7 @@ primary_reads_enabled = true
|
||||
# `random`: picks a shard at random
|
||||
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
|
||||
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
|
||||
# default_shard = "shard_0"
|
||||
# no_shard_specified_behavior = "shard_0"
|
||||
|
||||
# So what if you wanted to implement a different hashing function,
|
||||
# or you've already built one and you want this pooler to use it?
|
||||
|
||||
@@ -881,7 +881,6 @@ where
|
||||
};
|
||||
|
||||
query_router.update_pool_settings(&pool.settings);
|
||||
query_router.set_default_role();
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
|
||||
@@ -309,7 +309,6 @@ async fn prometheus_stats(
|
||||
push_pool_stats(&mut lines);
|
||||
push_server_stats(&mut lines);
|
||||
push_database_stats(&mut lines);
|
||||
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
|
||||
|
||||
Response::builder()
|
||||
.header("content-type", "text/plain; version=0.0.4")
|
||||
|
||||
@@ -386,18 +386,6 @@ impl QueryRouter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if a query is a mutation or not.
|
||||
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
|
||||
use sqlparser::ast::*;
|
||||
|
||||
match q.body.as_ref() {
|
||||
SetExpr::Insert(_) => true,
|
||||
SetExpr::Update(_) => true,
|
||||
SetExpr::Query(q) => Self::is_mutation_query(q),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||
if !self.pool_settings.query_parser_read_write_splitting {
|
||||
@@ -440,9 +428,8 @@ impl QueryRouter {
|
||||
};
|
||||
|
||||
let has_locks = !query.locks.is_empty();
|
||||
let has_mutation = Self::is_mutation_query(query);
|
||||
|
||||
if has_locks || has_mutation {
|
||||
if has_locks {
|
||||
self.active_role = Some(Role::Primary);
|
||||
} else if !visited_write_statement {
|
||||
// If we already visited a write statement, we should be going to the primary.
|
||||
@@ -504,33 +491,55 @@ impl QueryRouter {
|
||||
let mut table_names = Vec::new();
|
||||
|
||||
match q {
|
||||
Insert(i) => {
|
||||
Insert {
|
||||
or,
|
||||
into: _,
|
||||
table_name,
|
||||
columns,
|
||||
overwrite: _,
|
||||
source,
|
||||
partitioned,
|
||||
after_columns,
|
||||
table: _,
|
||||
on: _,
|
||||
returning: _,
|
||||
ignore: _,
|
||||
} => {
|
||||
// Not supported in postgres.
|
||||
assert!(i.or.is_none());
|
||||
assert!(i.partitioned.is_none());
|
||||
assert!(i.after_columns.is_empty());
|
||||
assert!(or.is_none());
|
||||
assert!(partitioned.is_none());
|
||||
assert!(after_columns.is_empty());
|
||||
|
||||
Self::process_table(&i.table_name, &mut table_names);
|
||||
if let Some(source) = &i.source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
|
||||
Self::process_table(table_name, &mut table_names);
|
||||
if let Some(source) = source {
|
||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||
}
|
||||
}
|
||||
Delete(d) => {
|
||||
if let Some(expr) = &d.selection {
|
||||
Delete {
|
||||
tables,
|
||||
from,
|
||||
using,
|
||||
selection,
|
||||
returning: _,
|
||||
order_by: _,
|
||||
limit: _,
|
||||
} => {
|
||||
if let Some(expr) = selection {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
|
||||
// Multi tables delete are not supported in postgres.
|
||||
assert!(d.tables.is_empty());
|
||||
assert!(tables.is_empty());
|
||||
|
||||
if let Some(using_tbl_with_join) = &d.using {
|
||||
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
|
||||
if let Some(using_tbl_with_join) = using {
|
||||
Self::process_tables_with_join(
|
||||
using_tbl_with_join,
|
||||
&mut exprs,
|
||||
&mut table_names,
|
||||
);
|
||||
}
|
||||
Self::process_selection(&d.selection, &mut exprs);
|
||||
Self::process_selection(selection, &mut exprs);
|
||||
}
|
||||
Update {
|
||||
table,
|
||||
@@ -800,13 +809,7 @@ impl QueryRouter {
|
||||
|
||||
for a in assignments {
|
||||
if sharding_key[0].value == "*"
|
||||
&& sharding_key[1].value
|
||||
== a.target
|
||||
.to_string()
|
||||
.split('.')
|
||||
.last()
|
||||
.unwrap()
|
||||
.to_lowercase()
|
||||
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
|
||||
{
|
||||
return Err(Error::QueryRouterParserError(
|
||||
"Sharding key cannot be updated.".into(),
|
||||
@@ -1045,11 +1048,6 @@ impl QueryRouter {
|
||||
self.active_shard
|
||||
}
|
||||
|
||||
/// Set active_role as the default_role specified in the pool.
|
||||
pub fn set_default_role(&mut self) {
|
||||
self.active_role = self.pool_settings.default_role;
|
||||
}
|
||||
|
||||
/// Get the current desired server role we should be talking to.
|
||||
pub fn role(&self) -> Option<Role> {
|
||||
self.active_role
|
||||
@@ -1115,26 +1113,6 @@ mod test {
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_cte_queries() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||
qr.pool_settings.query_parser_enabled = true;
|
||||
|
||||
let query = simple_query(
|
||||
"WITH t AS (
|
||||
SELECT id FROM users WHERE name ILIKE '%ja%'
|
||||
)
|
||||
UPDATE user_languages
|
||||
SET settings = '{}'
|
||||
FROM t WHERE t.id = user_id;",
|
||||
);
|
||||
let ast = qr.parse(&query).unwrap();
|
||||
assert!(qr.infer(&ast).is_ok());
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_replica() {
|
||||
QueryRouter::setup();
|
||||
|
||||
@@ -1,33 +1,22 @@
|
||||
GEM
|
||||
remote: https://rubygems.org/
|
||||
specs:
|
||||
activemodel (7.1.4)
|
||||
activesupport (= 7.1.4)
|
||||
activerecord (7.1.4)
|
||||
activemodel (= 7.1.4)
|
||||
activesupport (= 7.1.4)
|
||||
timeout (>= 0.4.0)
|
||||
activesupport (7.1.4)
|
||||
base64
|
||||
bigdecimal
|
||||
activemodel (7.0.4.1)
|
||||
activesupport (= 7.0.4.1)
|
||||
activerecord (7.0.4.1)
|
||||
activemodel (= 7.0.4.1)
|
||||
activesupport (= 7.0.4.1)
|
||||
activesupport (7.0.4.1)
|
||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||
connection_pool (>= 2.2.5)
|
||||
drb
|
||||
i18n (>= 1.6, < 2)
|
||||
minitest (>= 5.1)
|
||||
mutex_m
|
||||
tzinfo (~> 2.0)
|
||||
ast (2.4.2)
|
||||
base64 (0.2.0)
|
||||
bigdecimal (3.1.8)
|
||||
concurrent-ruby (1.3.4)
|
||||
connection_pool (2.4.1)
|
||||
concurrent-ruby (1.1.10)
|
||||
diff-lcs (1.5.0)
|
||||
drb (2.2.1)
|
||||
i18n (1.14.5)
|
||||
i18n (1.12.0)
|
||||
concurrent-ruby (~> 1.0)
|
||||
minitest (5.25.1)
|
||||
mutex_m (0.2.0)
|
||||
minitest (5.17.0)
|
||||
parallel (1.22.1)
|
||||
parser (3.1.2.0)
|
||||
ast (~> 2.4.1)
|
||||
@@ -35,8 +24,7 @@ GEM
|
||||
pg (1.3.2)
|
||||
rainbow (3.1.1)
|
||||
regexp_parser (2.3.1)
|
||||
rexml (3.3.6)
|
||||
strscan
|
||||
rexml (3.2.5)
|
||||
rspec (3.11.0)
|
||||
rspec-core (~> 3.11.0)
|
||||
rspec-expectations (~> 3.11.0)
|
||||
@@ -62,12 +50,10 @@ GEM
|
||||
rubocop-ast (1.17.0)
|
||||
parser (>= 3.1.1.0)
|
||||
ruby-progressbar (1.11.0)
|
||||
strscan (3.1.0)
|
||||
timeout (0.4.1)
|
||||
toml (0.3.0)
|
||||
parslet (>= 1.8.0, < 3.0.0)
|
||||
toxiproxy (2.0.1)
|
||||
tzinfo (2.0.6)
|
||||
tzinfo (2.0.5)
|
||||
concurrent-ruby (~> 1.0)
|
||||
unicode-display_width (2.1.0)
|
||||
|
||||
|
||||
@@ -56,41 +56,6 @@ describe "Random Load Balancing" do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when all replicas are down " do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
|
||||
|
||||
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count = 0
|
||||
number_of_replicas = processes[:replicas].length
|
||||
|
||||
# Take down all replicas
|
||||
processes[:replicas].each(&:take_down)
|
||||
|
||||
(number_of_replicas + 1).times do |n|
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(number_of_replicas + 1)
|
||||
failed_count = 0
|
||||
|
||||
# Ban_time is configured to 60 so this reset will only work
|
||||
# if the replicas are unbanned automatically
|
||||
processes[:replicas].each(&:reset)
|
||||
|
||||
number_of_replicas.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
expect(failed_count).to eq(0)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Least Outstanding Queries Load Balancing" do
|
||||
@@ -196,3 +161,4 @@ describe "Least Outstanding Queries Load Balancing" do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
Reference in New Issue
Block a user