mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
1 Commits
circleci_O
...
mostafa_sq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6e11e11da |
2
.github/workflows/chart-release.yaml
vendored
2
.github/workflows/chart-release.yaml
vendored
@@ -32,7 +32,7 @@ jobs:
|
|||||||
version: v3.13.0
|
version: v3.13.0
|
||||||
|
|
||||||
- name: Run chart-releaser
|
- name: Run chart-releaser
|
||||||
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
|
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
|
||||||
with:
|
with:
|
||||||
charts_dir: charts
|
charts_dir: charts
|
||||||
config: cr.yaml
|
config: cr.yaml
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,4 +12,3 @@ dev/cache
|
|||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
**/__pycache__
|
**/__pycache__
|
||||||
.bundle
|
|
||||||
26
CONFIG.md
26
CONFIG.md
@@ -36,11 +36,10 @@ Port at which prometheus exporter listens on.
|
|||||||
### connect_timeout
|
### connect_timeout
|
||||||
```
|
```
|
||||||
path: general.connect_timeout
|
path: general.connect_timeout
|
||||||
default: 1000 # milliseconds
|
default: 5000 # milliseconds
|
||||||
```
|
```
|
||||||
|
|
||||||
How long the client waits to obtain a server connection before aborting (ms).
|
How long to wait before aborting a server connection (ms).
|
||||||
This is similar to PgBouncer's `query_wait_timeout`.
|
|
||||||
|
|
||||||
### idle_timeout
|
### idle_timeout
|
||||||
```
|
```
|
||||||
@@ -463,18 +462,10 @@ path: pools.<pool_name>.users.<user_index>.pool_size
|
|||||||
default: 9
|
default: 9
|
||||||
```
|
```
|
||||||
|
|
||||||
Maximum number of server connections that can be established for this user.
|
Maximum number of server connections that can be established for this user
|
||||||
The maximum number of connection from a single Pgcat process to any database in the cluster
|
The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||||
is the sum of pool_size across all users.
|
is the sum of pool_size across all users.
|
||||||
|
|
||||||
### min_pool_size
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.users.<user_index>.min_pool_size
|
|
||||||
default: 0
|
|
||||||
```
|
|
||||||
|
|
||||||
Minimum number of idle server connections to retain for this pool.
|
|
||||||
|
|
||||||
### statement_timeout
|
### statement_timeout
|
||||||
```
|
```
|
||||||
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
path: pools.<pool_name>.users.<user_index>.statement_timeout
|
||||||
@@ -484,16 +475,6 @@ default: 0
|
|||||||
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
0 means it is disabled.
|
0 means it is disabled.
|
||||||
|
|
||||||
### connect_timeout
|
|
||||||
```
|
|
||||||
path: pools.<pool_name>.users.<user_index>.connect_timeout
|
|
||||||
default: <UNSET> # milliseconds
|
|
||||||
```
|
|
||||||
|
|
||||||
How long the client waits to obtain a server connection before aborting (ms).
|
|
||||||
This is similar to PgBouncer's `query_wait_timeout`.
|
|
||||||
If unset, uses the `connect_timeout` defined globally.
|
|
||||||
|
|
||||||
## `pools.<pool_name>.shards.<shard_index>` Section
|
## `pools.<pool_name>.shards.<shard_index>` Section
|
||||||
|
|
||||||
### servers
|
### servers
|
||||||
@@ -521,3 +502,4 @@ default: "shard0"
|
|||||||
```
|
```
|
||||||
|
|
||||||
Database name (e.g. "postgres")
|
Database name (e.g. "postgres")
|
||||||
|
|
||||||
|
|||||||
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -192,11 +192,12 @@ checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bb8"
|
name = "bb8"
|
||||||
version = "0.8.6"
|
version = "0.8.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -1525,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.41.0"
|
version = "0.51.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ edition = "2021"
|
|||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
md-5 = "0.10"
|
md-5 = "0.10"
|
||||||
bb8 = "=0.8.6"
|
bb8 = "0.8.1"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
sqlparser = { version = "0.51", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|||||||
@@ -5,4 +5,4 @@ maintainers:
|
|||||||
- name: Wildcard
|
- name: Wildcard
|
||||||
email: support@w6d.io
|
email: support@w6d.io
|
||||||
appVersion: "1.2.0"
|
appVersion: "1.2.0"
|
||||||
version: 0.2.1
|
version: 0.2.0
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ stringData:
|
|||||||
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
|
||||||
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
|
||||||
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
|
||||||
server_tls = {{ .Values.configuration.general.server_tls }}
|
|
||||||
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
|
||||||
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
|
||||||
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
|
||||||
@@ -59,21 +58,11 @@ stringData:
|
|||||||
##
|
##
|
||||||
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
|
||||||
username = {{ $user.username | quote }}
|
username = {{ $user.username | quote }}
|
||||||
{{- if $user.password }}
|
|
||||||
password = {{ $user.password | quote }}
|
password = {{ $user.password | quote }}
|
||||||
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
|
|
||||||
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
|
|
||||||
{{- if $secret }}
|
|
||||||
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
|
|
||||||
password = {{ $password | quote }}
|
|
||||||
{{- end }}
|
|
||||||
{{- end }}
|
|
||||||
pool_size = {{ $user.pool_size }}
|
pool_size = {{ $user.pool_size }}
|
||||||
statement_timeout = {{ default 0 $user.statement_timeout }}
|
statement_timeout = {{ $user.statement_timeout }}
|
||||||
min_pool_size = {{ default 3 $user.min_pool_size }}
|
min_pool_size = 3
|
||||||
{{- if $user.server_lifetime }}
|
server_lifetime = 60000
|
||||||
server_lifetime = {{ $user.server_lifetime }}
|
|
||||||
{{- end }}
|
|
||||||
{{- if and $user.server_username $user.server_password }}
|
{{- if and $user.server_username $user.server_password }}
|
||||||
server_username = {{ $user.server_username | quote }}
|
server_username = {{ $user.server_username | quote }}
|
||||||
server_password = {{ $user.server_password | quote }}
|
server_password = {{ $user.server_password | quote }}
|
||||||
|
|||||||
@@ -175,9 +175,6 @@ configuration:
|
|||||||
# Max connection lifetime before it's closed, even if actively used.
|
# Max connection lifetime before it's closed, even if actively used.
|
||||||
server_lifetime: 86400000 # 24 hours
|
server_lifetime: 86400000 # 24 hours
|
||||||
|
|
||||||
# Whether to use TLS for server connections or not.
|
|
||||||
server_tls: false
|
|
||||||
|
|
||||||
# How long a client is allowed to be idle while in a transaction (ms).
|
# How long a client is allowed to be idle while in a transaction (ms).
|
||||||
idle_client_in_transaction_timeout: 0 # milliseconds
|
idle_client_in_transaction_timeout: 0 # milliseconds
|
||||||
|
|
||||||
@@ -318,9 +315,7 @@ configuration:
|
|||||||
# ## Credentials for users that may connect to this cluster
|
# ## Credentials for users that may connect to this cluster
|
||||||
# ## @param users [array]
|
# ## @param users [array]
|
||||||
# ## @param users[0].username Name of the env var (required)
|
# ## @param users[0].username Name of the env var (required)
|
||||||
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
|
# ## @param users[0].password Value for the env var (required)
|
||||||
# ## @param users[0].passwordSecret.name Name of the secret containing the password
|
|
||||||
# ## @param users[0].passwordSecret.key Key in the secret containing the password
|
|
||||||
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
|
||||||
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
# users: []
|
# users: []
|
||||||
|
|||||||
@@ -881,7 +881,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
query_router.update_pool_settings(&pool.settings);
|
query_router.update_pool_settings(&pool.settings);
|
||||||
query_router.set_default_role();
|
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
|
|||||||
@@ -309,7 +309,6 @@ async fn prometheus_stats(
|
|||||||
push_pool_stats(&mut lines);
|
push_pool_stats(&mut lines);
|
||||||
push_server_stats(&mut lines);
|
push_server_stats(&mut lines);
|
||||||
push_database_stats(&mut lines);
|
push_database_stats(&mut lines);
|
||||||
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
|
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.header("content-type", "text/plain; version=0.0.4")
|
.header("content-type", "text/plain; version=0.0.4")
|
||||||
|
|||||||
@@ -386,18 +386,6 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Determines if a query is a mutation or not.
|
|
||||||
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
|
|
||||||
use sqlparser::ast::*;
|
|
||||||
|
|
||||||
match q.body.as_ref() {
|
|
||||||
SetExpr::Insert(_) => true,
|
|
||||||
SetExpr::Update(_) => true,
|
|
||||||
SetExpr::Query(q) => Self::is_mutation_query(q),
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to infer which server to connect to based on the contents of the query.
|
/// Try to infer which server to connect to based on the contents of the query.
|
||||||
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
|
||||||
if !self.pool_settings.query_parser_read_write_splitting {
|
if !self.pool_settings.query_parser_read_write_splitting {
|
||||||
@@ -440,9 +428,8 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let has_locks = !query.locks.is_empty();
|
let has_locks = !query.locks.is_empty();
|
||||||
let has_mutation = Self::is_mutation_query(query);
|
|
||||||
|
|
||||||
if has_locks || has_mutation {
|
if has_locks {
|
||||||
self.active_role = Some(Role::Primary);
|
self.active_role = Some(Role::Primary);
|
||||||
} else if !visited_write_statement {
|
} else if !visited_write_statement {
|
||||||
// If we already visited a write statement, we should be going to the primary.
|
// If we already visited a write statement, we should be going to the primary.
|
||||||
@@ -1061,11 +1048,6 @@ impl QueryRouter {
|
|||||||
self.active_shard
|
self.active_shard
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set active_role as the default_role specified in the pool.
|
|
||||||
pub fn set_default_role(&mut self) {
|
|
||||||
self.active_role = self.pool_settings.default_role;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the current desired server role we should be talking to.
|
/// Get the current desired server role we should be talking to.
|
||||||
pub fn role(&self) -> Option<Role> {
|
pub fn role(&self) -> Option<Role> {
|
||||||
self.active_role
|
self.active_role
|
||||||
@@ -1131,26 +1113,6 @@ mod test {
|
|||||||
assert_eq!(qr.role(), None);
|
assert_eq!(qr.role(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_split_cte_queries() {
|
|
||||||
QueryRouter::setup();
|
|
||||||
let mut qr = QueryRouter::new();
|
|
||||||
qr.pool_settings.query_parser_read_write_splitting = true;
|
|
||||||
qr.pool_settings.query_parser_enabled = true;
|
|
||||||
|
|
||||||
let query = simple_query(
|
|
||||||
"WITH t AS (
|
|
||||||
SELECT id FROM users WHERE name ILIKE '%ja%'
|
|
||||||
)
|
|
||||||
UPDATE user_languages
|
|
||||||
SET settings = '{}'
|
|
||||||
FROM t WHERE t.id = user_id;",
|
|
||||||
);
|
|
||||||
let ast = qr.parse(&query).unwrap();
|
|
||||||
assert!(qr.infer(&ast).is_ok());
|
|
||||||
assert_eq!(qr.role(), Some(Role::Primary));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_infer_replica() {
|
fn test_infer_replica() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
|||||||
@@ -1,33 +1,22 @@
|
|||||||
GEM
|
GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
activemodel (7.1.4)
|
activemodel (7.0.4.1)
|
||||||
activesupport (= 7.1.4)
|
activesupport (= 7.0.4.1)
|
||||||
activerecord (7.1.4)
|
activerecord (7.0.4.1)
|
||||||
activemodel (= 7.1.4)
|
activemodel (= 7.0.4.1)
|
||||||
activesupport (= 7.1.4)
|
activesupport (= 7.0.4.1)
|
||||||
timeout (>= 0.4.0)
|
activesupport (7.0.4.1)
|
||||||
activesupport (7.1.4)
|
|
||||||
base64
|
|
||||||
bigdecimal
|
|
||||||
concurrent-ruby (~> 1.0, >= 1.0.2)
|
concurrent-ruby (~> 1.0, >= 1.0.2)
|
||||||
connection_pool (>= 2.2.5)
|
|
||||||
drb
|
|
||||||
i18n (>= 1.6, < 2)
|
i18n (>= 1.6, < 2)
|
||||||
minitest (>= 5.1)
|
minitest (>= 5.1)
|
||||||
mutex_m
|
|
||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
base64 (0.2.0)
|
concurrent-ruby (1.1.10)
|
||||||
bigdecimal (3.1.8)
|
|
||||||
concurrent-ruby (1.3.4)
|
|
||||||
connection_pool (2.4.1)
|
|
||||||
diff-lcs (1.5.0)
|
diff-lcs (1.5.0)
|
||||||
drb (2.2.1)
|
i18n (1.12.0)
|
||||||
i18n (1.14.5)
|
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
minitest (5.25.1)
|
minitest (5.17.0)
|
||||||
mutex_m (0.2.0)
|
|
||||||
parallel (1.22.1)
|
parallel (1.22.1)
|
||||||
parser (3.1.2.0)
|
parser (3.1.2.0)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
@@ -63,11 +52,10 @@ GEM
|
|||||||
parser (>= 3.1.1.0)
|
parser (>= 3.1.1.0)
|
||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
strscan (3.1.0)
|
strscan (3.1.0)
|
||||||
timeout (0.4.1)
|
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
toxiproxy (2.0.1)
|
||||||
tzinfo (2.0.6)
|
tzinfo (2.0.5)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.1.0)
|
unicode-display_width (2.1.0)
|
||||||
|
|
||||||
|
|||||||
@@ -56,41 +56,6 @@ describe "Random Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when all replicas are down " do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
|
|
||||||
|
|
||||||
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count = 0
|
|
||||||
number_of_replicas = processes[:replicas].length
|
|
||||||
|
|
||||||
# Take down all replicas
|
|
||||||
processes[:replicas].each(&:take_down)
|
|
||||||
|
|
||||||
(number_of_replicas + 1).times do |n|
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(number_of_replicas + 1)
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
# Ban_time is configured to 60 so this reset will only work
|
|
||||||
# if the replicas are unbanned automatically
|
|
||||||
processes[:replicas].each(&:reset)
|
|
||||||
|
|
||||||
number_of_replicas.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
expect(failed_count).to eq(0)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "Least Outstanding Queries Load Balancing" do
|
describe "Least Outstanding Queries Load Balancing" do
|
||||||
@@ -196,3 +161,4 @@ describe "Least Outstanding Queries Load Balancing" do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user