Compare commits

..

1 Commits

Author SHA1 Message Date
Saraj Munjal
78982a68e4 Upgrade TLS libraries and code 2024-09-05 12:21:26 -07:00
36 changed files with 852 additions and 1754 deletions

View File

@@ -26,7 +26,6 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server &
sleep 1
@@ -107,7 +106,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
pytest || exit 1
python3 tests/python/tests.py || exit 1
#
@@ -178,6 +177,3 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown
sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

View File

@@ -22,7 +22,7 @@ jobs:
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python
uses: actions/setup-python@v5.1.0
uses: actions/setup-python@v4.1.0
with:
python-version: 3.7
@@ -43,7 +43,7 @@ jobs:
run: ct lint --config ct.yaml
- name: Create kind cluster
uses: helm/kind-action@v1.10.0
uses: helm/kind-action@v1.7.0
if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install)

View File

@@ -32,7 +32,7 @@ jobs:
version: v3.13.0
- name: Run chart-releaser
uses: helm/chart-releaser-action@a917fd15b20e8b64b94d9158ad54cd6345335584 # v1.6.0
uses: helm/chart-releaser-action@be16258da8010256c6e82849661221415f031968 # v1.5.0
with:
charts_dir: charts
config: cr.yaml

View File

@@ -1,9 +1,6 @@
name: pgcat package (deb)
on:
push:
tags:
- v*
workflow_dispatch:
inputs:
packageVersion:
@@ -19,14 +16,6 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Set package version
if: github.event_name == 'push' # For push event
run: |
TAG=${{ github.ref_name }}
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
- name: Set package version (manual dispatch)
if: github.event_name == 'workflow_dispatch' # For manual dispatch
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
@@ -50,10 +39,10 @@ jobs:
export ARCH=arm64
fi
bash utilities/deb.sh ${{ env.packageVersion }}
bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \
--lock \
--bucket apt.postgresml.org \
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs)

2
.gitignore vendored
View File

@@ -11,5 +11,3 @@ dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
**/__pycache__
.bundle

View File

@@ -36,11 +36,10 @@ Port at which prometheus exporter listens on.
### connect_timeout
```
path: general.connect_timeout
default: 1000 # milliseconds
default: 5000 # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
How long to wait before aborting a server connection (ms).
### idle_timeout
```
@@ -298,19 +297,6 @@ Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy connections
### checkout_failure_limit
```
path: pools.<pool_name>.checkout_failure_limit
default: 0 (disabled)
```
`Maximum number of checkout failures a client is allowed before it
gets disconnected. This is needed to prevent persistent client/server
imbalance in high availability setups where multiple PgCat instances are placed
behind a single load balancer. If for any reason a client lands on a PgCat instance that has
a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
in session mode
`
### default_role
```
path: pools.<pool_name>.default_role
@@ -322,45 +308,6 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.
### db_activity_based_routing
```
path: pools.<pool_name>.db_activity_based_routing
default: false
```
If enabled, PgCat will route queries to the primary if the queried table was recently written to.
Only relevant when `query_parser_enabled` *and* `query_parser_read_write_splitting` is enabled.
##### Considerations:
- *This feature is experimental and may not work as expected.*
- This feature only works when the same PgCat instance is used for both reads and writes to the database.
- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries.
- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions.
### db_activity_based_ms_init_delay
```
path: pools.<pool_name>.db_activity_based_ms_init_delay
default: 100
```
The delay in milliseconds before the first activity-based routing check is performed.
### db_activity_ttl
```
path: pools.<pool_name>.db_activity_ttl
default: 900
```
The time in seconds after which a DB is considered inactive when no queries/updates are performed to it.
### table_mutation_cache_ms_ttl
```
path: pools.<pool_name>.table_mutation_cache_ms_ttl
default: 50
```
The time in milliseconds after a write to a table that all queries to that table will be routed to the primary.
### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
@@ -515,18 +462,10 @@ path: pools.<pool_name>.users.<user_index>.pool_size
default: 9
```
Maximum number of server connections that can be established for this user.
Maximum number of server connections that can be established for this user
The maximum number of connection from a single Pgcat process to any database in the cluster
is the sum of pool_size across all users.
### min_pool_size
```
path: pools.<pool_name>.users.<user_index>.min_pool_size
default: 0
```
Minimum number of idle server connections to retain for this pool.
### statement_timeout
```
path: pools.<pool_name>.users.<user_index>.statement_timeout
@@ -536,16 +475,6 @@ default: 0
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
0 means it is disabled.
### connect_timeout
```
path: pools.<pool_name>.users.<user_index>.connect_timeout
default: <UNSET> # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
If unset, uses the `connect_timeout` defined globally.
## `pools.<pool_name>.shards.<shard_index>` Section
### servers
@@ -573,3 +502,4 @@ default: "shard0"
```
Database name (e.g. "postgres")

View File

@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`

744
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.3.0"
version = "1.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "=0.8.6"
bb8 = "0.8.1"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.52", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"
@@ -27,8 +27,8 @@ hmac = "0.12"
sha2 = "0.10"
base64 = "0.21"
stringprep = "0.1"
tokio-rustls = "0.24"
rustls-pemfile = "1"
tokio-rustls = "0.26"
rustls-pemfile = "2"
http-body-util = "0.1.2"
hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1.7", features = ["tokio"] }
@@ -42,7 +42,8 @@ postgres-protocol = "0.6.5"
fallible-iterator = "0.2"
pin-project = "1"
webpki-roots = "0.23"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls = { version = "0.23.12", features=["ring", "tls12"]}
rustls-pki-types = "1.8.0"
trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
@@ -55,10 +56,6 @@ tracing-subscriber = { version = "0.3.17", features = [
"std",
] }
lru = "0.12.0"
mini-moka = "0.10.3"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"
[dev-dependencies]
serial_test = "*"

View File

@@ -1,4 +1,4 @@
FROM rust:1.81.0-slim-bookworm AS builder
FROM rust:1.79.0-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential

View File

@@ -1,4 +1,4 @@
FROM cimg/rust:1.81.0
FROM cimg/rust:1.79.0
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \

View File

@@ -231,7 +231,7 @@ User.find_by_email("test@example.com")
```sql
-- Grab a bunch of users from shard 1
SET SHARD TO '1';
SELECT * FROM users LIMIT 10;
SELECT * FROM users LIMT 10;
-- Find by id
SET SHARDING KEY TO '1234';

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: pgcat
description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.
maintainers:
- name: PostgresML
email: team@postgresml.org
appVersion: "1.3.0"
version: 0.2.5
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.0

View File

@@ -15,7 +15,6 @@ stringData:
connect_timeout = {{ .Values.configuration.general.connect_timeout }}
idle_timeout = {{ .Values.configuration.general.idle_timeout | int }}
server_lifetime = {{ .Values.configuration.general.server_lifetime | int }}
server_tls = {{ .Values.configuration.general.server_tls }}
idle_client_in_transaction_timeout = {{ .Values.configuration.general.idle_client_in_transaction_timeout | int }}
healthcheck_timeout = {{ .Values.configuration.general.healthcheck_timeout }}
healthcheck_delay = {{ .Values.configuration.general.healthcheck_delay }}
@@ -51,10 +50,6 @@ stringData:
query_parser_enabled = {{ default true $pool.query_parser_enabled }}
query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }}
primary_reads_enabled = {{ default true $pool.primary_reads_enabled }}
db_activity_based_routing = {{ default false $pool.db_activity_based_routing }}
db_activity_based_ms_init_delay = {{ default 100 $pool.db_activity_based_ms_init_delay }}
db_activity_ttl = {{ default 900 $pool.db_activity_ttl }}
table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }}
sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }}
{{- range $index, $user := $pool.users }}
@@ -63,21 +58,11 @@ stringData:
##
[pools.{{ $pool.name | quote }}.users.{{ $index }}]
username = {{ $user.username | quote }}
{{- if $user.password }}
password = {{ $user.password | quote }}
{{- else if and $user.passwordSecret.name $user.passwordSecret.key }}
{{- $secret := (lookup "v1" "Secret" $.Release.Namespace $user.passwordSecret.name) }}
{{- if $secret }}
{{- $password := index $secret.data $user.passwordSecret.key | b64dec }}
password = {{ $password | quote }}
{{- end }}
{{- end }}
pool_size = {{ $user.pool_size }}
statement_timeout = {{ default 0 $user.statement_timeout }}
min_pool_size = {{ default 3 $user.min_pool_size }}
{{- if $user.server_lifetime }}
server_lifetime = {{ $user.server_lifetime }}
{{- end }}
statement_timeout = {{ $user.statement_timeout }}
min_pool_size = 3
server_lifetime = 60000
{{- if and $user.server_username $user.server_password }}
server_username = {{ $user.server_username | quote }}
server_password = {{ $user.server_password | quote }}

View File

@@ -175,9 +175,6 @@ configuration:
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
# Whether to use TLS for server connections or not.
server_tls: false
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
@@ -298,22 +295,6 @@ configuration:
# ## @param configuration.poolsPostgres.query_parser_read_write_splitting
# query_parser_read_write_splitting: true
# ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated.
# ## @param configuration.poolsPostgres.db_activity_based_routing
# db_activity_based_routing: false
# ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation.
# ## @param configuration.poolsPostgres.db_activity_based_ms_init_delay
# db_activity_based_ms_init_delay: 100
# ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries.
# ## @param configuration.poolsPostgres.db_activity_ttl
# db_activity_ttl: 900
# ## Table mutation cache TTL. How long to keep track of table mutations.
# ## @param configuration.poolsPostgres.table_mutation_cache_ttl
# table_mutation_cache_ttl: 50
# ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# ## load balancing of read queries. Otherwise, the primary will only be used for write
# ## queries. The primary can always be explicitly selected with our custom protocol.
@@ -334,9 +315,7 @@ configuration:
# ## Credentials for users that may connect to this cluster
# ## @param users [array]
# ## @param users[0].username Name of the env var (required)
# ## @param users[0].password Value for the env var (required) leave empty to use existing secret see passwordSecret.name and passwordSecret.key
# ## @param users[0].passwordSecret.name Name of the secret containing the password
# ## @param users[0].passwordSecret.key Key in the secret containing the password
# ## @param users[0].password Value for the env var (required)
# ## @param users[0].pool_size Maximum number of server connections that can be established for this user
# ## @param users[0].statement_timeout Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
# users: []

View File

@@ -1,2 +1 @@
sign: false
pages_branch: main

View File

@@ -179,7 +179,7 @@ primary_reads_enabled = true
# `random`: picks a shard at random
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
# default_shard = "shard_0"
# no_shard_specified_behavior = "shard_0"
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?

View File

@@ -7,7 +7,3 @@ systemctl enable pgcat
if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat
fi
if [ -f /etc/pgcat.toml ]; then
systemctl start pgcat
fi

View File

@@ -1,4 +1,3 @@
use crate::config::AuthType;
use crate::errors::Error;
use crate::pool::ConnectionPool;
use crate::server::Server;
@@ -72,7 +71,6 @@ impl AuthPassthrough {
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
let auth_user = crate::config::User {
username: self.user.clone(),
auth_type: AuthType::MD5,
password: Some(self.password.clone()),
server_username: None,
server_password: None,

View File

@@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
};
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*;
use crate::messages::*;
use crate::plugins::PluginOutput;
@@ -465,8 +463,8 @@ where
.count()
== 1;
if !admin && admin_only {
// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only {
debug!(
"Rejecting non-admin connection to {} when in admin only mode",
pool_name
@@ -483,16 +481,8 @@ where
let process_id: i32 = rand::random();
let secret_key: i32 = rand::random();
let mut prepared_statements_enabled = false;
// Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin {
let config = get_config();
// TODO: Add SASL support.
// Perform MD5 authentication.
match config.general.admin_auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
@@ -535,6 +525,12 @@ where
}
};
let mut prepared_statements_enabled = false;
// Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin {
let config = get_config();
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
@@ -543,16 +539,14 @@ where
);
if password_hash != password_response {
let error =
Error::ClientGeneralError("Invalid password".into(), client_identifier);
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(error);
}
}
}
(false, generate_server_parameters_for_admin())
}
// Authenticate normal user.
@@ -579,53 +573,6 @@ where
// Obtain the hash to compare, we give preference to that written in cleartext in config
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
// when the pool was created. If there is no hash there, we try to fetch it one more time.
match pool.settings.user.auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
@@ -646,10 +593,7 @@ where
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!(
"Password for {}, obtained. Updating.",
client_identifier
);
warn!("Password for {}, obtained. Updating.", client_identifier);
{
let mut pool_auth_hash = pool.auth_hash.write();
@@ -714,8 +658,7 @@ where
));
}
}
}
}
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
prepared_statements_enabled =
transaction_mode && pool.prepared_statement_cache.is_some();
@@ -859,8 +802,6 @@ where
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut checkout_failure_count: u64 = 0;
self.stats.register(self.stats.clone());
// Result returned by one of the plugins.
@@ -883,7 +824,6 @@ where
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
@@ -1110,25 +1050,7 @@ where
query_router.role(),
err
);
checkout_failure_count += 1;
if let Some(limit) = pool.settings.checkout_failure_limit {
if checkout_failure_count >= limit {
error!(
"Checkout failure limit reached ({} / {}) - disconnecting client",
checkout_failure_count, limit
);
error_response_terminal(
&mut self.write,
&format!(
"checkout failure limit reached ({} / {})",
checkout_failure_count, limit
),
)
.await?;
self.stats.disconnect();
return Ok(());
}
}
continue;
}
};
@@ -1331,7 +1253,7 @@ where
self.buffer_describe(message).await?;
}
// Execute2
// Execute
// Execute a prepared statement prepared in `P` and bound in `B`.
'E' => {
self.extended_protocol_data_buffer

View File

@@ -208,9 +208,6 @@ impl Address {
pub struct User {
pub username: String,
pub password: Option<String>,
#[serde(default = "User::default_auth_type")]
pub auth_type: AuthType,
pub server_username: Option<String>,
pub server_password: Option<String>,
pub pool_size: u32,
@@ -228,7 +225,6 @@ impl Default for User {
User {
username: String::from("postgres"),
password: None,
auth_type: AuthType::MD5,
server_username: None,
server_password: None,
pool_size: 15,
@@ -243,10 +239,6 @@ impl Default for User {
}
impl User {
pub fn default_auth_type() -> AuthType {
AuthType::MD5
}
fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size {
@@ -342,9 +334,6 @@ pub struct General {
pub admin_username: String,
pub admin_password: String,
#[serde(default = "General::default_admin_auth_type")]
pub admin_auth_type: AuthType,
#[serde(default = "General::default_validate_config")]
pub validate_config: bool,
@@ -359,10 +348,6 @@ impl General {
"0.0.0.0".into()
}
pub fn default_admin_auth_type() -> AuthType {
AuthType::MD5
}
pub fn default_port() -> u16 {
5432
}
@@ -471,7 +456,6 @@ impl Default for General {
verify_server_certificate: false,
admin_username: String::from("admin"),
admin_password: String::from("admin"),
admin_auth_type: AuthType::MD5,
validate_config: true,
auth_query: None,
auth_query_user: None,
@@ -492,15 +476,6 @@ pub enum PoolMode {
Session,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum AuthType {
#[serde(alias = "trust", alias = "Trust")]
Trust,
#[serde(alias = "md5", alias = "MD5")]
MD5,
}
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -558,14 +533,6 @@ pub struct Pool {
/// Close idle connections that have been opened for longer than this.
pub idle_timeout: Option<u64>,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
/// Close server connections that have been opened for longer than this.
/// Only applied to idle connections. If the connection is actively used for
/// longer than this period, the pool will not interrupt it.
@@ -597,19 +564,6 @@ pub struct Pool {
#[serde(default = "Pool::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
// Support for query routing based on database activity
#[serde(default = "Pool::default_db_activity_based_routing")]
pub db_activity_based_routing: bool,
#[serde(default = "Pool::default_db_activity_init_delay")]
pub db_activity_init_delay: u64,
#[serde(default = "Pool::default_db_activity_ttl")]
pub db_activity_ttl: u64,
#[serde(default = "Pool::default_table_mutation_cache_ms_ttl")]
pub table_mutation_cache_ms_ttl: u64,
pub plugins: Option<Plugins>,
pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
@@ -663,25 +617,6 @@ impl Pool {
0
}
pub fn default_db_activity_based_routing() -> bool {
false
}
pub fn default_db_activity_init_delay() -> u64 {
// 100 milliseconds
100
}
pub fn default_db_activity_ttl() -> u64 {
// 15 minutes
15 * 60
}
pub fn default_table_mutation_cache_ms_ttl() -> u64 {
// 50 milliseconds
50
}
pub fn validate(&mut self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
@@ -764,23 +699,6 @@ impl Pool {
user.validate()?;
}
if self.db_activity_based_routing {
if self.db_activity_init_delay == 0 {
error!("db_activity_init_delay must be greater than 0");
return Err(Error::BadConfig);
}
if self.table_mutation_cache_ms_ttl == 0 {
error!("table_mutation_cache_ms_ttl must be greater than 0");
return Err(Error::BadConfig);
}
if self.db_activity_ttl == 0 {
error!("db_activity_ttl must be greater than 0");
return Err(Error::BadConfig);
}
}
Ok(())
}
}
@@ -790,7 +708,6 @@ impl Default for Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
checkout_failure_limit: None,
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
@@ -811,10 +728,6 @@ impl Default for Pool {
cleanup_server_connections: true,
log_client_parameter_status_changes: false,
prepared_statements_cache_size: Self::default_prepared_statements_cache_size(),
db_activity_based_routing: Self::default_db_activity_based_routing(),
db_activity_init_delay: Self::default_db_activity_init_delay(),
db_activity_ttl: Self::default_db_activity_ttl(),
table_mutation_cache_ms_ttl: Self::default_table_mutation_cache_ms_ttl(),
plugins: None,
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
@@ -1307,17 +1220,6 @@ impl Config {
None => self.general.idle_timeout,
};
info!("[pool: {}] Idle timeout: {}ms", pool_name, idle_timeout);
match pool_config.checkout_failure_limit {
Some(checkout_failure_limit) => {
info!(
"[pool: {}] Checkout failure limit: {}",
pool_name, checkout_failure_limit
);
}
None => {
info!("[pool: {}] Checkout failure limit: not set", pool_name);
}
};
info!(
"[pool: {}] Sharding function: {}",
pool_name,
@@ -1362,22 +1264,6 @@ impl Config {
"[pool: {}] Cleanup server connections: {}",
pool_name, pool_config.cleanup_server_connections
);
info!(
"[pool: {}] DB activity based routing: {}",
pool_name, pool_config.db_activity_based_routing
);
info!(
"[pool: {}] DB activity init delay: {}",
pool_name, pool_config.db_activity_init_delay
);
info!(
"[pool: {}] DB activity TTL: {}",
pool_name, pool_config.db_activity_ttl
);
info!(
"[pool: {}] Table mutation cache TTL: {}",
pool_name, pool_config.table_mutation_cache_ms_ttl
);
info!(
"[pool: {}] Log client parameter status changes: {}",
pool_name, pool_config.log_client_parameter_status_changes

View File

@@ -3,7 +3,7 @@ use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Initialize a default filter, and then override the builtin default "warning" with our
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());

View File

@@ -152,14 +152,6 @@ pub struct PoolSettings {
/// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode,
/// Maximum number of checkout failures a client is allowed before it
/// gets disconnected. This is needed to prevent persistent client/server
/// imbalance in high availability setups where multiple PgCat instances are placed
/// behind a single load balancer. If for any reason a client lands on a PgCat instance that has
/// a large number of connected clients, it might get stuck in perpetual checkout failure loop especially
/// in session mode
pub checkout_failure_limit: Option<u64>,
// Number of shards.
pub shards: usize,
@@ -182,18 +174,6 @@ pub struct PoolSettings {
// Read from the primary as well or not.
pub primary_reads_enabled: bool,
// Automatic primary/replica selection based on recent activity.
pub db_activity_based_routing: bool,
// DB activity init delay
pub db_activity_init_delay: u64,
// DB activity TTL
pub db_activity_ttl: u64,
// Table mutation cache TTL
pub table_mutation_cache_ms_ttl: u64,
// Sharding function.
pub sharding_function: ShardingFunction,
@@ -235,7 +215,6 @@ impl Default for PoolSettings {
PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 1,
user: User::default(),
db: String::default(),
@@ -244,10 +223,6 @@ impl Default for PoolSettings {
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: true,
db_activity_based_routing: false,
db_activity_init_delay: 100,
db_activity_ttl: 15 * 60,
table_mutation_cache_ms_ttl: 50,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
healthcheck_delay: General::default_healthcheck_delay(),
@@ -546,7 +521,6 @@ impl ConnectionPool {
None => pool_config.pool_mode,
},
load_balancing_mode: pool_config.load_balancing_mode,
checkout_failure_limit: pool_config.checkout_failure_limit,
// shards: pool_config.shards.clone(),
shards: shard_ids.len(),
user: user.clone(),
@@ -563,10 +537,6 @@ impl ConnectionPool {
.query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
db_activity_based_routing: pool_config.db_activity_based_routing,
db_activity_init_delay: pool_config.db_activity_init_delay,
db_activity_ttl: pool_config.db_activity_ttl,
table_mutation_cache_ms_ttl: pool_config.table_mutation_cache_ms_ttl,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
healthcheck_delay: config.general.healthcheck_delay,
healthcheck_timeout: config.general.healthcheck_timeout,

View File

@@ -200,17 +200,18 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
let formatted_labels = self
.labels
.iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"{name}{{{formatted_labels}}} {value}",
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels,
value = self.value
)
@@ -246,7 +247,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -263,8 +264,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -288,15 +288,6 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels)
}
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
}
async fn prometheus_stats(
@@ -309,7 +300,6 @@ async fn prometheus_stats(
push_pool_stats(&mut lines);
push_server_stats(&mut lines);
push_database_stats(&mut lines);
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
Response::builder()
.header("content-type", "text/plain; version=0.0.4")
@@ -323,7 +313,6 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -333,10 +322,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value)
{
grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -344,53 +330,33 @@ fn push_address_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone();
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);
let metrics = vec![
("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections),
@@ -399,10 +365,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value)
{
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -410,14 +373,6 @@ fn push_database_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -450,7 +405,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -473,10 +428,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value)
{
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
lines.push(prometheus_metric.to_string());
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -485,14 +437,6 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
pub async fn start_metric_server(http_addr: SocketAddr) {

View File

@@ -2,7 +2,6 @@
/// or implied query characteristics.
use bytes::{Buf, BytesMut};
use log::{debug, error};
use mini_moka::sync::Cache;
use once_cell::sync::OnceCell;
use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update};
@@ -12,7 +11,6 @@ use sqlparser::ast::{
};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::sync::OnceLock;
use crate::config::Role;
use crate::errors::Error;
@@ -23,7 +21,6 @@ use crate::sharding::Sharder;
use std::collections::BTreeSet;
use std::io::Cursor;
use std::time::Duration;
use std::{cmp, mem};
/// Regexes used to parse custom commands.
@@ -69,18 +66,6 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
// Get the value inside the custom command.
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
#[derive(Debug, Clone, PartialEq)]
enum DatabaseActivityState {
Active,
Initializing,
}
// A moka cache for the databases
// the key is the database name and the value is the database activity state
static DATABASE_ACTIVITY_CACHE: OnceLock<Cache<String, DatabaseActivityState>> = OnceLock::new();
// A moka cache for the tables, the key is the db_table.
static TABLE_MUTATIONS_CACHE: OnceLock<Cache<String, bool>> = OnceLock::new();
/// The query router.
pub struct QueryRouter {
/// Which shard we should be talking to right now.
@@ -102,12 +87,6 @@ pub struct QueryRouter {
placeholders: Vec<i16>,
}
struct ExtractedExprsAndTables<'a> {
exprs: Vec<Expr>,
table_names: Vec<Vec<Ident>>,
assignments_opt: Option<&'a Vec<Assignment>>,
}
impl QueryRouter {
/// One-time initialization of regexes
/// that parse our custom SQL protocol.
@@ -407,53 +386,6 @@ impl QueryRouter {
}
}
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
_ => false,
}
}
fn database_activity_cache(&self) -> Cache<String, DatabaseActivityState> {
DATABASE_ACTIVITY_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl))
.build()
})
.clone()
}
/// Check database activity state and reset it if necessary
fn database_activity_state(&self, db: &String) -> DatabaseActivityState {
let cache = self.database_activity_cache();
// Exists in cache
if cache.contains_key(db) {
return cache.get(db).unwrap();
}
// Not in cache
debug!("Adding database to cache: {}", db);
cache.insert(db.to_string(), DatabaseActivityState::Initializing);
// Set a timer to update the cache
let db = db.clone();
let db_activity_init_delay = self.pool_settings.db_activity_init_delay;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await;
cache.insert(db, DatabaseActivityState::Active);
});
DatabaseActivityState::Initializing
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting {
@@ -468,23 +400,9 @@ impl QueryRouter {
return Err(Error::QueryRouterParserError("empty query".into()));
}
let mut primary_set_based_on_activity = false;
let mut visited_write_statement = false;
let mut prev_inferred_shard = None;
if self.pool_settings.db_activity_based_routing {
let db = self.pool_settings.db.clone();
let state = self.database_activity_state(&db);
debug!("Database activity state: {:?}", state);
if let DatabaseActivityState::Initializing = state {
debug!("Database is initializing, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
}
}
for q in ast {
match q {
// All transactions go to the primary, probably a write.
@@ -495,22 +413,6 @@ impl QueryRouter {
// Likely a read-only query
Query(query) => {
if primary_set_based_on_activity {
// If we already set the role based on activity, we don't need to do it again
continue;
}
if self.pool_settings.db_activity_based_routing {
// Check if the tables in the query have been written to recently
if self.query_handles_tables_in_mutation_cache(query) {
debug!("Query handles tables in mutation cache, going to primary");
self.active_role = Some(Role::Primary);
primary_set_based_on_activity = true;
continue;
}
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: if we have multiple queries in the same message,
@@ -526,9 +428,8 @@ impl QueryRouter {
};
let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
if has_locks || has_mutation {
if has_locks {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
@@ -541,13 +442,6 @@ impl QueryRouter {
// Likely a write
_ => {
debug!("Write statement found, going to primary");
if self.pool_settings.db_activity_based_routing {
// add all of the query tables to the mutation cache
self.update_mutation_cache_on_write(q);
}
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: similar to the above, if we have multiple queries in the
@@ -590,69 +484,62 @@ impl QueryRouter {
Ok(())
}
fn table_mutations_cache(&self) -> Cache<String, bool> {
TABLE_MUTATIONS_CACHE
.get_or_init(|| {
Cache::builder()
.time_to_live(Duration::from_millis(
self.pool_settings.table_mutation_cache_ms_ttl,
))
.build()
})
.clone()
}
fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool {
let table_mutations_cache = self.table_mutations_cache();
debug!("Checking if query handles tables in mutation cache");
debug!("Table mutations cache: {:?}", table_mutations_cache);
for tables in self.table_names(query) {
for table in tables {
if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) {
return true;
}
}
}
false
}
fn extract_exprs_and_table_names<'a>(
&'a self,
q: &'a Statement,
) -> Option<ExtractedExprsAndTables<'a>> {
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
let mut exprs = Vec::new();
// Collect all table names from the query.
let mut table_names = Vec::new();
let mut assignments_opt = None;
match q {
Insert(i) => {
Insert {
or,
into: _,
table_name,
columns,
overwrite: _,
source,
partitioned,
after_columns,
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(i.or.is_none());
assert!(i.partitioned.is_none());
assert!(i.after_columns.is_empty());
assert!(or.is_none());
assert!(partitioned.is_none());
assert!(after_columns.is_empty());
Self::process_table(&i.table_name, &mut table_names);
if let Some(source) = &i.source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(&i.columns));
Self::process_table(table_name, &mut table_names);
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
}
Delete(d) => {
if let Some(expr) = &d.selection {
Delete {
tables,
from,
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone());
}
// Multi-tables delete are not supported in postgres.
assert!(d.tables.is_empty());
// Multi tables delete are not supported in postgres.
assert!(tables.is_empty());
if let Some(using_tbl_with_join) = &d.using {
Self::process_tables_with_join(from, &mut exprs, &mut table_names);
if let Some(using_tbl_with_join) = using {
Self::process_tables_with_join(
using_tbl_with_join,
&mut exprs,
&mut table_names,
);
}
Self::process_selection(&d.selection, &mut exprs);
Self::process_selection(selection, &mut exprs);
}
Update {
table,
@@ -666,55 +553,14 @@ impl QueryRouter {
Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names);
}
Self::process_selection(selection, &mut exprs);
assignments_opt = Some(assignments);
}
_ => return None,
};
Some(ExtractedExprsAndTables {
exprs,
table_names,
assignments_opt,
})
}
fn infer_shard_on_write(&mut self, q: &Statement) -> Result<Option<usize>, Error> {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
let exprs = extracted.exprs;
let table_names = extracted.table_names;
let assignments_opt = extracted.assignments_opt;
if let Some(assignments) = assignments_opt {
self.assignment_parser(assignments)?;
}
_ => {
return Ok(None);
}
};
Ok(self.infer_shard_from_exprs(exprs, table_names))
} else {
Ok(None)
}
}
fn update_mutation_cache_on_write(&self, q: &Statement) {
if let Some(extracted) = self.extract_exprs_and_table_names(q) {
debug!("Updating mutation cache on write");
let table_names = extracted.table_names;
debug!("Table names in mutation query: {:?}", table_names);
let table_mutations_cache = self.table_mutations_cache();
for tables in table_names {
for table in tables {
table_mutations_cache.insert(self.table_mutation_cache_key(table), true);
}
}
}
}
// combines the database name and table name into a single string
// to be used as the key in the table mutation cache
// e.g. "mydb.mytable"
fn table_mutation_cache_key(&self, table: Ident) -> String {
format!("{}.{}", self.pool_settings.db, table.value)
}
fn process_query(
@@ -963,13 +809,7 @@ impl QueryRouter {
for a in assignments {
if sharding_key[0].value == "*"
&& sharding_key[1].value
== a.target
.to_string()
.split('.')
.last()
.unwrap()
.to_lowercase()
&& sharding_key[1].value == a.id.last().unwrap().value.to_lowercase()
{
return Err(Error::QueryRouterParserError(
"Sharding key cannot be updated.".into(),
@@ -1102,18 +942,6 @@ impl QueryRouter {
self.infer_shard_from_exprs(exprs, table_names)
}
/// get table names from query
fn table_names(&self, query: &sqlparser::ast::Query) -> Vec<Vec<Ident>> {
let mut exprs = Vec::new();
let mut table_names = Vec::new();
Self::process_query(query, &mut exprs, &mut table_names, &None);
debug!("Table names in query: {:?}", table_names);
table_names
}
fn infer_shard_from_exprs(
&mut self,
exprs: Vec<Expr>,
@@ -1220,11 +1048,6 @@ impl QueryRouter {
self.active_shard
}
/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}
/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
@@ -1281,7 +1104,6 @@ mod test {
use crate::messages::simple_query;
use crate::sharding::ShardingFunction;
use bytes::BufMut;
use serial_test::serial;
#[test]
fn test_defaults() {
@@ -1291,26 +1113,6 @@ mod test {
assert_eq!(qr.role(), None);
}
#[test]
fn test_split_cte_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
let query = simple_query(
"WITH t AS (
SELECT id FROM users WHERE name ILIKE '%ja%'
)
UPDATE user_languages
SET settings = '{}'
FROM t WHERE t.id = user_id;",
);
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[test]
fn test_infer_replica() {
QueryRouter::setup();
@@ -1617,7 +1419,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: None,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
@@ -1638,10 +1439,6 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
let mut qr = QueryRouter::new();
@@ -1700,7 +1497,6 @@ mod test {
let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
checkout_failure_limit: Some(10),
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
@@ -1721,10 +1517,6 @@ mod test {
auth_query_password: None,
auth_query_user: None,
db: "test".to_string(),
db_activity_based_routing: PoolSettings::default().db_activity_based_routing,
db_activity_init_delay: PoolSettings::default().db_activity_init_delay,
db_activity_ttl: PoolSettings::default().db_activity_ttl,
table_mutation_cache_ms_ttl: PoolSettings::default().table_mutation_cache_ms_ttl,
plugins: None,
};
@@ -2140,150 +1932,4 @@ mod test {
assert_eq!(res, Ok(PluginOutput::Allow));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_initializing_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Initially, the database activity should be in the "Initializing" state
let state = qr.database_activity_state(&qr.pool_settings.db.clone());
assert_eq!(state, DatabaseActivityState::Initializing);
// Check that the router chooses the primary role due to "Initializing" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_active_state() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
let db_name = qr.pool_settings.db.clone();
let cache = qr.database_activity_cache();
cache.insert(db_name.clone(), DatabaseActivityState::Active);
let query = simple_query("SELECT * FROM some_table");
let ast = qr.parse(&query).unwrap();
// Check that the router can choose a replica role when in "Active" state
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), None); // Default should allow replica due to active state
}
#[tokio::test]
#[serial]
async fn test_table_mutation_cache_on_write() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.table_mutation_cache_ms_ttl = 20_000; // 20 seconds in milliseconds
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.db = "test_table_mutation_cache".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1");
let ast = qr.parse(&query).unwrap();
// Simulate the mutation query which should populate the mutation cache
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table"));
let cache = qr.table_mutations_cache();
// Ensure the table mutation cache contains the table with recent write
assert!(cache.contains_key(&table_cache_key));
}
#[tokio::test]
#[serial]
async fn test_db_activity_based_routing_multi_query() {
use super::*;
use crate::messages::simple_query;
use tokio::time::Duration;
QueryRouter::setup();
let mut qr = QueryRouter::new();
// Configure the pool settings for db_activity_based_routing
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
qr.pool_settings.db_activity_based_routing = true;
qr.pool_settings.db = "test_db_activity_routing".to_string();
qr.database_activity_cache()
.invalidate(&qr.pool_settings.db.clone());
// First query when database is initializing
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
// Should route to primary because database is initializing
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the initialization delay to pass
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.db_activity_init_delay * 2,
))
.await;
// Next query after database is active
let query = simple_query("SELECT * FROM test_table");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because database is active and no recent mutations
assert_eq!(qr.role(), None);
// Simulate a write query to update the mutation cache
let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because it's a write operation
assert_eq!(qr.role(), Some(Role::Primary));
// Immediately run a read query on the same table
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to primary because the table was recently mutated
assert_eq!(qr.role(), Some(Role::Primary));
// Wait for the mutation cache TTL to expire
tokio::time::sleep(Duration::from_millis(
qr.pool_settings.table_mutation_cache_ms_ttl * 2,
))
.await;
// Run the read query again after cache expiration
let query = simple_query("SELECT * FROM test_table WHERE id = 1");
let ast = qr.parse(&query).unwrap();
qr.active_role = None; // Reset the active_role
assert!(qr.infer(&ast).is_ok());
// Should route to replica because mutation cache has expired
assert_eq!(qr.role(), None);
}
}

View File

@@ -7,6 +7,7 @@ use lru::LruCache;
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use rustls::pki_types::TrustAnchor;
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::net::IpAddr;
@@ -15,7 +16,6 @@ use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
use tokio::net::TcpStream;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{get_config, Address, User};
@@ -399,19 +399,18 @@ impl Server {
'S' => {
debug!("Connecting to server using TLS");
let mut root_store = RootCertStore::empty();
root_store.add_server_trust_anchors(
webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
)
let root_store = rustls::RootCertStore::from_iter(
webpki_roots::TLS_SERVER_ROOTS
.0
.iter()
.map(|ta| TrustAnchor {
subject: ta.subject.into(),
subject_public_key_info: ta.spki.into(),
name_constraints: ta.name_constraints.map(|nc| nc.into()),
}),
);
let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store)
.with_no_client_auth();
@@ -426,7 +425,7 @@ impl Server {
let connector = TlsConnector::from(Arc::new(tls_config));
let stream = match connector
.connect(address.host.as_str().try_into().unwrap(), stream)
.connect(address.host.clone().try_into().unwrap(), stream)
.await
{
Ok(stream) => stream,

View File

@@ -1,36 +1,34 @@
// Stream wrapper.
use crate::config::get_config;
use crate::errors::Error;
use rustls::client::danger::HandshakeSignatureValid;
use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName, UnixTime};
use rustls::DigitallySignedStruct;
use rustls_pemfile::{certs, read_one, Item};
use std::iter;
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
use tokio_rustls::rustls::{
self,
client::{ServerCertVerified, ServerCertVerifier},
Certificate, PrivateKey, ServerName,
};
use tokio_rustls::rustls::crypto::{verify_tls12_signature, verify_tls13_signature};
use tokio_rustls::TlsAcceptor;
use crate::config::get_config;
use crate::errors::Error;
// TLS
pub fn load_certs(path: &Path) -> std::io::Result<Vec<Certificate>> {
pub fn load_certs(path: &Path) -> std::io::Result<Vec<CertificateDer>> {
certs(&mut std::io::BufReader::new(std::fs::File::open(path)?))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))
.map(|mut certs| certs.drain(..).map(Certificate).collect())
.map(|mut certs| std::mem::take(&mut certs))
}
pub fn load_keys(path: &Path) -> std::io::Result<Vec<PrivateKey>> {
pub fn load_keys(path: &Path) -> std::io::Result<Vec<PrivateKeyDer>> {
let mut rd = std::io::BufReader::new(std::fs::File::open(path)?);
iter::from_fn(|| read_one(&mut rd).transpose())
.filter_map(|item| match item {
Err(err) => Some(Err(err)),
Ok(Item::RSAKey(key)) => Some(Ok(PrivateKey(key))),
Ok(Item::ECKey(key)) => Some(Ok(PrivateKey(key))),
Ok(Item::PKCS8Key(key)) => Some(Ok(PrivateKey(key))),
Ok(Item::Pkcs1Key(key)) => Some(Ok(PrivateKeyDer::Pkcs1(key))),
Ok(Item::Sec1Key(key)) => Some(Ok(PrivateKeyDer::Sec1(key))),
Ok(Item::Pkcs8Key(key)) => Some(Ok(PrivateKeyDer::Pkcs8(key))),
_ => None,
})
.collect()
@@ -43,19 +41,18 @@ pub struct Tls {
impl Tls {
pub fn new() -> Result<Self, Error> {
let config = get_config();
let certs = match load_certs(Path::new(&config.general.tls_certificate.unwrap())) {
let cert_ref: String = config.general.tls_certificate.unwrap();
let certs = match load_certs(Path::new(cert_ref.leak())) {
Ok(certs) => certs,
Err(_) => return Err(Error::TlsError),
};
let mut keys = match load_keys(Path::new(&config.general.tls_private_key.unwrap())) {
let pk_ref = config.general.tls_private_key.unwrap();
let mut keys = match load_keys(Path::new(pk_ref.leak())) {
Ok(keys) => keys,
Err(_) => return Err(Error::TlsError),
};
let config = match rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, keys.remove(0))
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))
@@ -70,18 +67,52 @@ impl Tls {
}
}
#[derive(Debug)]
pub struct NoCertificateVerification;
impl ServerCertVerifier for NoCertificateVerification {
impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_server_name: &ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
verify_tls12_signature(
message,
cert,
dss,
&rustls::crypto::ring::default_provider().signature_verification_algorithms,
)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &CertificateDer<'_>,
dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
verify_tls13_signature(
message,
cert,
dss,
&rustls::crypto::ring::default_provider().signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
rustls::crypto::ring::default_provider()
.signature_verification_algorithms
.supported_schemes()
}
}

View File

@@ -23,7 +23,7 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"

View File

@@ -1,4 +1,4 @@
FROM rust:1.81.0-slim-bookworm
FROM rust:bullseye
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h

View File

@@ -1,3 +1,2 @@
pytest
psycopg2==2.9.3
psutil==5.9.1

View File

@@ -1,71 +0,0 @@
import utils
import signal
class TestTrustAuth:
@classmethod
def setup_method(cls):
config= """
[general]
host = "0.0.0.0"
port = 6432
admin_username = "admin_user"
admin_password = ""
admin_auth_type = "trust"
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
auth_type = "trust"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.sharded_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
]
database = "shard0"
"""
utils.pgcat_generic_start(config)
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_admin_trust_auth(self):
conn, cur = utils.connect_db_trust(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_normal_trust_auth(self):
conn, cur = utils.connect_db_trust(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
class TestMD5Auth:
@classmethod
def setup_method(cls):
utils.pgcat_start()
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_normal_db_access(self):
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access(self):
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)

View File

@@ -1,12 +1,84 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal
import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_shutdown_logic():
@@ -14,17 +86,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and send query (not in transaction)
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
cur.execute("COMMIT;")
# Send sigint to pgcat
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions
@@ -36,18 +108,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -66,24 +138,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -93,18 +165,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -122,30 +194,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
start = time.perf_counter()
try:
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("SELECT 1;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
@@ -155,49 +227,49 @@ def test_shutdown_logic():
else:
raise Exception("Able connect to database during shutdown")
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
conn, cur = utils.connect_db(admin=True)
conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = utils.connect_db(admin=True)
admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
@@ -205,24 +277,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -236,7 +308,12 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

View File

@@ -1,110 +0,0 @@
import os
import signal
import time
from typing import Tuple
import tempfile
import psutil
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def _pgcat_start(config_path: str):
pg_cat_send_signal(signal.SIGTERM)
os.system(f"./target/debug/pgcat {config_path} &")
time.sleep(2)
def pgcat_start():
_pgcat_start(config_path='.circleci/pgcat.toml')
def pgcat_generic_start(config: str):
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'w') as f:
f.write(config)
_pgcat_start(config_path=tmp.name)
def glauth_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.name() == "glauth":
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep glauth'):
raise Exception("glauth not closed after SIGTERM")
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def connect_db_trust(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
db = "pgcat"
else:
user = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()

View File

@@ -1,33 +1,22 @@
GEM
remote: https://rubygems.org/
specs:
activemodel (7.1.4)
activesupport (= 7.1.4)
activerecord (7.1.4)
activemodel (= 7.1.4)
activesupport (= 7.1.4)
timeout (>= 0.4.0)
activesupport (7.1.4)
base64
bigdecimal
activemodel (7.0.4.1)
activesupport (= 7.0.4.1)
activerecord (7.0.4.1)
activemodel (= 7.0.4.1)
activesupport (= 7.0.4.1)
activesupport (7.0.4.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
connection_pool (>= 2.2.5)
drb
i18n (>= 1.6, < 2)
minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0)
ast (2.4.2)
base64 (0.2.0)
bigdecimal (3.1.8)
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
concurrent-ruby (1.1.10)
diff-lcs (1.5.0)
drb (2.2.1)
i18n (1.14.5)
i18n (1.12.0)
concurrent-ruby (~> 1.0)
minitest (5.25.1)
mutex_m (0.2.0)
minitest (5.17.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
@@ -35,8 +24,7 @@ GEM
pg (1.3.2)
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.3.6)
strscan
rexml (3.2.5)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
@@ -62,12 +50,10 @@ GEM
rubocop-ast (1.17.0)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
strscan (3.1.0)
timeout (0.4.1)
toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1)
tzinfo (2.0.6)
tzinfo (2.0.5)
concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)

View File

@@ -56,41 +56,6 @@ describe "Random Load Balancing" do
end
end
end
context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }
it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length
# Take down all replicas
processes[:replicas].each(&:take_down)
(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0
# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)
number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end
describe "Least Outstanding Queries Load Balancing" do
@@ -196,3 +161,4 @@ describe "Least Outstanding Queries Load Balancing" do
end
end
end

View File

@@ -188,102 +188,6 @@ describe "Miscellaneous" do
end
end
describe "Checkout failure limit" do
context "when no checkout failure limit is set" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set high" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 10000
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "does not disconnect client" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(0.5);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
expect(conn.status).to eq(PG::CONNECTION_OK)
end
end
conn.close
end
end.each(&:join)
end
end
context "when checkout failure limit is set low" do
before do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 200
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["checkout_failure_limit"] = 2
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
sleep 0.5
end
it "disconnects client after reaching limit" do
Array.new(5) do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
checkout_failure_count = 0
for i in 0..4
begin
conn.async_exec("SELECT pg_sleep(1);")
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::SystemError
checkout_failure_count += 1
expect(conn.status).to eq(PG::CONNECTION_OK)
rescue PG::ConnectionBad
expect(checkout_failure_count).to eq(2)
expect(conn.status).to eq(PG::CONNECTION_BAD)
break
end
end
conn.close
end
end.each(&:join)
puts processes.pgcat.logs
end
end
end
describe "Server version reporting" do
it "reports correct version for normal and admin databases" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))