mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
15 Commits
v1.2.0
...
mostafa_sq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6e11e11da | ||
|
|
4aaa4378cf | ||
|
|
670311daf9 | ||
|
|
b9ec7f8036 | ||
|
|
d91d23848b | ||
|
|
bbbc01a467 | ||
|
|
9bb71ede9d | ||
|
|
88b2afb19b | ||
|
|
f0865ca616 | ||
|
|
7d047c6c19 | ||
|
|
f73d15f82c | ||
|
|
69af6cc5e5 | ||
|
|
ca34597002 | ||
|
|
2def40ea6a | ||
|
|
c05129018d |
@@ -59,6 +59,7 @@ admin_password = "admin_pass"
|
|||||||
# session: one server connection per connected client
|
# session: one server connection per connected client
|
||||||
# transaction: one server connection per client transaction
|
# transaction: one server connection per client transaction
|
||||||
pool_mode = "transaction"
|
pool_mode = "transaction"
|
||||||
|
prepared_statements_cache_size = 500
|
||||||
|
|
||||||
# If the client doesn't specify, route traffic to
|
# If the client doesn't specify, route traffic to
|
||||||
# this role by default.
|
# this role by default.
|
||||||
@@ -141,6 +142,7 @@ query_parser_enabled = true
|
|||||||
query_parser_read_write_splitting = true
|
query_parser_read_write_splitting = true
|
||||||
primary_reads_enabled = true
|
primary_reads_enabled = true
|
||||||
sharding_function = "pg_bigint_hash"
|
sharding_function = "pg_bigint_hash"
|
||||||
|
prepared_statements_cache_size = 500
|
||||||
|
|
||||||
[pools.simple_db.users.0]
|
[pools.simple_db.users.0]
|
||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
|||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
|
kill -9 $(pgrep toxiproxy) || true
|
||||||
LOG_LEVEL=error toxiproxy-server &
|
LOG_LEVEL=error toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
@@ -106,7 +107,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py || exit 1
|
pytest || exit 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
|
|||||||
|
|
||||||
# Allow for graceful shutdown
|
# Allow for graceful shutdown
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
|
kill -9 $(pgrep toxiproxy)
|
||||||
|
sleep 1
|
||||||
|
|||||||
4
.github/workflows/chart-lint-test.yaml
vendored
4
.github/workflows/chart-lint-test.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
|||||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4.1.0
|
uses: actions/setup-python@v5.1.0
|
||||||
with:
|
with:
|
||||||
python-version: 3.7
|
python-version: 3.7
|
||||||
|
|
||||||
@@ -43,7 +43,7 @@ jobs:
|
|||||||
run: ct lint --config ct.yaml
|
run: ct lint --config ct.yaml
|
||||||
|
|
||||||
- name: Create kind cluster
|
- name: Create kind cluster
|
||||||
uses: helm/kind-action@v1.7.0
|
uses: helm/kind-action@v1.10.0
|
||||||
if: steps.list-changed.outputs.changed == 'true'
|
if: steps.list-changed.outputs.changed == 'true'
|
||||||
|
|
||||||
- name: Run chart-testing (install)
|
- name: Run chart-testing (install)
|
||||||
|
|||||||
15
.github/workflows/publish-deb-package.yml
vendored
15
.github/workflows/publish-deb-package.yml
vendored
@@ -1,6 +1,9 @@
|
|||||||
name: pgcat package (deb)
|
name: pgcat package (deb)
|
||||||
|
|
||||||
on:
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- v*
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
packageVersion:
|
packageVersion:
|
||||||
@@ -16,6 +19,14 @@ jobs:
|
|||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
- name: Set package version
|
||||||
|
if: github.event_name == 'push' # For push event
|
||||||
|
run: |
|
||||||
|
TAG=${{ github.ref_name }}
|
||||||
|
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
|
||||||
|
- name: Set package version (manual dispatch)
|
||||||
|
if: github.event_name == 'workflow_dispatch' # For manual dispatch
|
||||||
|
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
toolchain: stable
|
toolchain: stable
|
||||||
@@ -39,10 +50,10 @@ jobs:
|
|||||||
export ARCH=arm64
|
export ARCH=arm64
|
||||||
fi
|
fi
|
||||||
|
|
||||||
bash utilities/deb.sh ${{ inputs.packageVersion }}
|
bash utilities/deb.sh ${{ env.packageVersion }}
|
||||||
|
|
||||||
deb-s3 upload \
|
deb-s3 upload \
|
||||||
--lock \
|
--lock \
|
||||||
--bucket apt.postgresml.org \
|
--bucket apt.postgresml.org \
|
||||||
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
||||||
--codename $(lsb_release -cs)
|
--codename $(lsb_release -cs)
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,3 +11,4 @@ dev/.bash_history
|
|||||||
dev/cache
|
dev/cache
|
||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
|
**/__pycache__
|
||||||
|
|||||||
@@ -6,6 +6,32 @@ Thank you for contributing! Just a few tips here:
|
|||||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||||
|
|
||||||
|
## How to run the integration tests locally and iterate on them
|
||||||
|
We have integration tests written in Ruby, Python, Go and Rust.
|
||||||
|
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||||
|
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||||
|
|
||||||
|
|
||||||
|
Quite simply, make sure you have docker installed and then run
|
||||||
|
`./start_test_env.sh`
|
||||||
|
|
||||||
|
That is it!
|
||||||
|
|
||||||
|
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
||||||
|
|
||||||
|
Once the environment is ready, you can run the tests by running
|
||||||
|
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||||
|
Python: `cd /app/ && pytest`
|
||||||
|
Rust: `cd /app/tests/rust && cargo run`
|
||||||
|
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||||
|
|
||||||
|
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
||||||
|
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Happy hacking!
|
Happy hacking!
|
||||||
|
|
||||||
## TODOs
|
## TODOs
|
||||||
|
|||||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1526,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.41.0"
|
version = "0.51.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = { version = "0.41", features = ["visitor"] }
|
sqlparser = { version = "0.51", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|||||||
@@ -268,6 +268,8 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
|
|||||||
|
|
||||||
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
|
||||||
|
|
||||||
|
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
|
||||||
|
|
||||||
### Live configuration reloading
|
### Live configuration reloading
|
||||||
|
|
||||||
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.
|
||||||
|
|||||||
2124
grafana_dashboard.json
Normal file
2124
grafana_dashboard.json
Normal file
File diff suppressed because it is too large
Load Diff
4
postinst
4
postinst
@@ -7,3 +7,7 @@ systemctl enable pgcat
|
|||||||
if ! id pgcat 2> /dev/null; then
|
if ! id pgcat 2> /dev/null; then
|
||||||
useradd -s /usr/bin/false pgcat
|
useradd -s /usr/bin/false pgcat
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [ -f /etc/pgcat.toml ]; then
|
||||||
|
systemctl start pgcat
|
||||||
|
fi
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::config::AuthType;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::ConnectionPool;
|
use crate::pool::ConnectionPool;
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -71,6 +72,7 @@ impl AuthPassthrough {
|
|||||||
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
||||||
let auth_user = crate::config::User {
|
let auth_user = crate::config::User {
|
||||||
username: self.user.clone(),
|
username: self.user.clone(),
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
password: Some(self.password.clone()),
|
password: Some(self.password.clone()),
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
|
|||||||
161
src/client.rs
161
src/client.rs
@@ -14,7 +14,9 @@ use tokio::sync::mpsc::Sender;
|
|||||||
|
|
||||||
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
||||||
use crate::auth_passthrough::refetch_auth_hash;
|
use crate::auth_passthrough::refetch_auth_hash;
|
||||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
use crate::config::{
|
||||||
|
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
|
||||||
|
};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
use crate::plugins::PluginOutput;
|
||||||
@@ -463,8 +465,8 @@ where
|
|||||||
.count()
|
.count()
|
||||||
== 1;
|
== 1;
|
||||||
|
|
||||||
// Kick any client that's not admin while we're in admin-only mode.
|
|
||||||
if !admin && admin_only {
|
if !admin && admin_only {
|
||||||
|
// Kick any client that's not admin while we're in admin-only mode.
|
||||||
debug!(
|
debug!(
|
||||||
"Rejecting non-admin connection to {} when in admin only mode",
|
"Rejecting non-admin connection to {} when in admin only mode",
|
||||||
pool_name
|
pool_name
|
||||||
@@ -481,6 +483,105 @@ where
|
|||||||
let process_id: i32 = rand::random();
|
let process_id: i32 = rand::random();
|
||||||
let secret_key: i32 = rand::random();
|
let secret_key: i32 = rand::random();
|
||||||
|
|
||||||
|
let mut prepared_statements_enabled = false;
|
||||||
|
|
||||||
|
// Authenticate admin user.
|
||||||
|
let (transaction_mode, mut server_parameters) = if admin {
|
||||||
|
let config = get_config();
|
||||||
|
// TODO: Add SASL support.
|
||||||
|
// Perform MD5 authentication.
|
||||||
|
match config.general.admin_auth_type {
|
||||||
|
AuthType::Trust => (),
|
||||||
|
AuthType::MD5 => {
|
||||||
|
let salt = md5_challenge(&mut write).await?;
|
||||||
|
|
||||||
|
let code = match read.read_u8().await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password code".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// PasswordMessage
|
||||||
|
if code as char != 'p' {
|
||||||
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
|
"Expected p, got {}",
|
||||||
|
code as char
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = match read.read_i32().await {
|
||||||
|
Ok(len) => len,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Compare server and client hashes.
|
||||||
|
let password_hash = md5_hash_password(
|
||||||
|
&config.general.admin_username,
|
||||||
|
&config.general.admin_password,
|
||||||
|
&salt,
|
||||||
|
);
|
||||||
|
|
||||||
|
if password_hash != password_response {
|
||||||
|
let error =
|
||||||
|
Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||||
|
|
||||||
|
warn!("{}", error);
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(false, generate_server_parameters_for_admin())
|
||||||
|
}
|
||||||
|
// Authenticate normal user.
|
||||||
|
else {
|
||||||
|
let pool = match get_pool(pool_name, username) {
|
||||||
|
Some(pool) => pool,
|
||||||
|
None => {
|
||||||
|
error_response(
|
||||||
|
&mut write,
|
||||||
|
&format!(
|
||||||
|
"No pool configured for database: {:?}, user: {:?}",
|
||||||
|
pool_name, username
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
return Err(Error::ClientGeneralError(
|
||||||
|
"Invalid pool name".into(),
|
||||||
|
client_identifier,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
||||||
|
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
||||||
|
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
||||||
|
match pool.settings.user.auth_type {
|
||||||
|
AuthType::Trust => (),
|
||||||
|
AuthType::MD5 => {
|
||||||
// Perform MD5 authentication.
|
// Perform MD5 authentication.
|
||||||
// TODO: Add SASL support.
|
// TODO: Add SASL support.
|
||||||
let salt = md5_challenge(&mut write).await?;
|
let salt = md5_challenge(&mut write).await?;
|
||||||
@@ -525,54 +626,6 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut prepared_statements_enabled = false;
|
|
||||||
|
|
||||||
// Authenticate admin user.
|
|
||||||
let (transaction_mode, mut server_parameters) = if admin {
|
|
||||||
let config = get_config();
|
|
||||||
|
|
||||||
// Compare server and client hashes.
|
|
||||||
let password_hash = md5_hash_password(
|
|
||||||
&config.general.admin_username,
|
|
||||||
&config.general.admin_password,
|
|
||||||
&salt,
|
|
||||||
);
|
|
||||||
|
|
||||||
if password_hash != password_response {
|
|
||||||
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
|
||||||
|
|
||||||
warn!("{}", error);
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
|
|
||||||
(false, generate_server_parameters_for_admin())
|
|
||||||
}
|
|
||||||
// Authenticate normal user.
|
|
||||||
else {
|
|
||||||
let pool = match get_pool(pool_name, username) {
|
|
||||||
Some(pool) => pool,
|
|
||||||
None => {
|
|
||||||
error_response(
|
|
||||||
&mut write,
|
|
||||||
&format!(
|
|
||||||
"No pool configured for database: {:?}, user: {:?}",
|
|
||||||
pool_name, username
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
return Err(Error::ClientGeneralError(
|
|
||||||
"Invalid pool name".into(),
|
|
||||||
client_identifier,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
|
||||||
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
|
||||||
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
|
||||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||||
Some(md5_hash_password(username, password, &salt))
|
Some(md5_hash_password(username, password, &salt))
|
||||||
} else {
|
} else {
|
||||||
@@ -593,7 +646,10 @@ where
|
|||||||
|
|
||||||
match refetch_auth_hash(&pool).await {
|
match refetch_auth_hash(&pool).await {
|
||||||
Ok(fetched_hash) => {
|
Ok(fetched_hash) => {
|
||||||
warn!("Password for {}, obtained. Updating.", client_identifier);
|
warn!(
|
||||||
|
"Password for {}, obtained. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
@@ -658,7 +714,8 @@ where
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
||||||
prepared_statements_enabled =
|
prepared_statements_enabled =
|
||||||
transaction_mode && pool.prepared_statement_cache.is_some();
|
transaction_mode && pool.prepared_statement_cache.is_some();
|
||||||
|
|||||||
@@ -208,6 +208,9 @@ impl Address {
|
|||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: Option<String>,
|
pub password: Option<String>,
|
||||||
|
|
||||||
|
#[serde(default = "User::default_auth_type")]
|
||||||
|
pub auth_type: AuthType,
|
||||||
pub server_username: Option<String>,
|
pub server_username: Option<String>,
|
||||||
pub server_password: Option<String>,
|
pub server_password: Option<String>,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
@@ -225,6 +228,7 @@ impl Default for User {
|
|||||||
User {
|
User {
|
||||||
username: String::from("postgres"),
|
username: String::from("postgres"),
|
||||||
password: None,
|
password: None,
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
@@ -239,6 +243,10 @@ impl Default for User {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
|
pub fn default_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
fn validate(&self) -> Result<(), Error> {
|
fn validate(&self) -> Result<(), Error> {
|
||||||
if let Some(min_pool_size) = self.min_pool_size {
|
if let Some(min_pool_size) = self.min_pool_size {
|
||||||
if min_pool_size > self.pool_size {
|
if min_pool_size > self.pool_size {
|
||||||
@@ -334,6 +342,9 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_admin_auth_type")]
|
||||||
|
pub admin_auth_type: AuthType,
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
#[serde(default = "General::default_validate_config")]
|
||||||
pub validate_config: bool,
|
pub validate_config: bool,
|
||||||
|
|
||||||
@@ -348,6 +359,10 @@ impl General {
|
|||||||
"0.0.0.0".into()
|
"0.0.0.0".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_admin_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
pub fn default_port() -> u16 {
|
pub fn default_port() -> u16 {
|
||||||
5432
|
5432
|
||||||
}
|
}
|
||||||
@@ -456,6 +471,7 @@ impl Default for General {
|
|||||||
verify_server_certificate: false,
|
verify_server_certificate: false,
|
||||||
admin_username: String::from("admin"),
|
admin_username: String::from("admin"),
|
||||||
admin_password: String::from("admin"),
|
admin_password: String::from("admin"),
|
||||||
|
admin_auth_type: AuthType::MD5,
|
||||||
validate_config: true,
|
validate_config: true,
|
||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
@@ -476,6 +492,15 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
||||||
|
pub enum AuthType {
|
||||||
|
#[serde(alias = "trust", alias = "Trust")]
|
||||||
|
Trust,
|
||||||
|
|
||||||
|
#[serde(alias = "md5", alias = "MD5")]
|
||||||
|
MD5,
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
impl std::fmt::Display for PoolMode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
|||||||
@@ -12,19 +12,30 @@ use std::collections::HashMap;
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
use crate::config::Address;
|
use crate::config::Address;
|
||||||
use crate::pool::{get_all_pools, PoolIdentifier};
|
use crate::pool::{get_all_pools, PoolIdentifier};
|
||||||
|
use crate::stats::get_server_stats;
|
||||||
use crate::stats::pool::PoolStats;
|
use crate::stats::pool::PoolStats;
|
||||||
use crate::stats::{get_server_stats, ServerStats};
|
|
||||||
|
|
||||||
struct MetricHelpType {
|
struct MetricHelpType {
|
||||||
help: &'static str,
|
help: &'static str,
|
||||||
ty: &'static str,
|
ty: &'static str,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ServerPrometheusStats {
|
||||||
|
bytes_received: u64,
|
||||||
|
bytes_sent: u64,
|
||||||
|
transaction_count: u64,
|
||||||
|
query_count: u64,
|
||||||
|
error_count: u64,
|
||||||
|
active_count: u64,
|
||||||
|
idle_count: u64,
|
||||||
|
login_count: u64,
|
||||||
|
tested_count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
|
||||||
// counters only increase
|
// counters only increase
|
||||||
// gauges can arbitrarily increase or decrease
|
// gauges can arbitrarily increase or decrease
|
||||||
@@ -127,22 +138,46 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
|
|||||||
},
|
},
|
||||||
"servers_bytes_received" => MetricHelpType {
|
"servers_bytes_received" => MetricHelpType {
|
||||||
help: "Volume in bytes of network traffic received by server",
|
help: "Volume in bytes of network traffic received by server",
|
||||||
ty: "gauge",
|
ty: "counter",
|
||||||
},
|
},
|
||||||
"servers_bytes_sent" => MetricHelpType {
|
"servers_bytes_sent" => MetricHelpType {
|
||||||
help: "Volume in bytes of network traffic sent by server",
|
help: "Volume in bytes of network traffic sent by server",
|
||||||
ty: "gauge",
|
ty: "counter",
|
||||||
},
|
},
|
||||||
"servers_transaction_count" => MetricHelpType {
|
"servers_transaction_count" => MetricHelpType {
|
||||||
help: "Number of transactions executed by server",
|
help: "Number of transactions executed by server",
|
||||||
ty: "gauge",
|
ty: "counter",
|
||||||
},
|
},
|
||||||
"servers_query_count" => MetricHelpType {
|
"servers_query_count" => MetricHelpType {
|
||||||
help: "Number of queries executed by server",
|
help: "Number of queries executed by server",
|
||||||
ty: "gauge",
|
ty: "counter",
|
||||||
},
|
},
|
||||||
"servers_error_count" => MetricHelpType {
|
"servers_error_count" => MetricHelpType {
|
||||||
help: "Number of errors",
|
help: "Number of errors",
|
||||||
|
ty: "counter",
|
||||||
|
},
|
||||||
|
"servers_idle_count" => MetricHelpType {
|
||||||
|
help: "Number of server connection in idle state",
|
||||||
|
ty: "gauge",
|
||||||
|
},
|
||||||
|
"servers_active_count" => MetricHelpType {
|
||||||
|
help: "Number of server connection in active state",
|
||||||
|
ty: "gauge",
|
||||||
|
},
|
||||||
|
"servers_tested_count" => MetricHelpType {
|
||||||
|
help: "Number of server connection in tested state",
|
||||||
|
ty: "gauge",
|
||||||
|
},
|
||||||
|
"servers_login_count" => MetricHelpType {
|
||||||
|
help: "Number of server connection in login state",
|
||||||
|
ty: "gauge",
|
||||||
|
},
|
||||||
|
"servers_is_banned" => MetricHelpType {
|
||||||
|
help: "0 if server is not banned, 1 if server is banned",
|
||||||
|
ty: "gauge",
|
||||||
|
},
|
||||||
|
"servers_is_paused" => MetricHelpType {
|
||||||
|
help: "0 if server is not paused, 1 if server is paused",
|
||||||
ty: "gauge",
|
ty: "gauge",
|
||||||
},
|
},
|
||||||
"databases_pool_size" => MetricHelpType {
|
"databases_pool_size" => MetricHelpType {
|
||||||
@@ -165,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
|
|||||||
|
|
||||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
let formatted_labels = self
|
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
||||||
.labels
|
sorted_labels.sort_by_key(|&(key, _)| key);
|
||||||
|
let formatted_labels = sorted_labels
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(",");
|
.join(",");
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
"{name}{{{formatted_labels}}} {value}",
|
||||||
name = format_args!("pgcat_{}", self.name),
|
name = format_args!("pgcat_{}", self.name),
|
||||||
help = self.help,
|
|
||||||
ty = self.ty,
|
|
||||||
formatted_labels = formatted_labels,
|
formatted_labels = formatted_labels,
|
||||||
value = self.value
|
value = self.value
|
||||||
)
|
)
|
||||||
@@ -210,7 +244,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -225,7 +261,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -236,7 +274,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("shard", address.shard.to_string());
|
labels.insert("shard", address.shard.to_string());
|
||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
|
labels.insert("username", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -248,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
|
|
||||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_header(&self) -> String {
|
||||||
|
format!(
|
||||||
|
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
||||||
|
name = format_args!("pgcat_{}", self.name),
|
||||||
|
help = self.help,
|
||||||
|
ty = self.ty,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prometheus_stats(
|
async fn prometheus_stats(
|
||||||
@@ -273,6 +322,7 @@ async fn prometheus_stats(
|
|||||||
|
|
||||||
// Adds metrics shown in a SHOW STATS admin command.
|
// Adds metrics shown in a SHOW STATS admin command.
|
||||||
fn push_address_stats(lines: &mut Vec<String>) {
|
fn push_address_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
@@ -282,7 +332,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key)
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -290,33 +343,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
let pool_stats = PoolStats::construct_pool_lookup();
|
let pool_stats = PoolStats::construct_pool_lookup();
|
||||||
for (pool_id, stats) in pool_stats.iter() {
|
for (pool_id, stats) in pool_stats.iter() {
|
||||||
for (name, value) in stats.clone() {
|
for (name, value) in stats.clone() {
|
||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(name)
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||||
fn push_database_stats(lines: &mut Vec<String>) {
|
fn push_database_stats(lines: &mut Vec<String>) {
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
let pool_config = pool.settings.clone();
|
let pool_config = pool.settings.clone();
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
let address = pool.address(shard, server);
|
let address = pool.address(shard, server);
|
||||||
let pool_state = pool.pool_state(shard, server);
|
let pool_state = pool.pool_state(shard, server);
|
||||||
|
|
||||||
let metrics = vec![
|
let metrics = vec![
|
||||||
("pool_size", pool_config.user.pool_size),
|
("pool_size", pool_config.user.pool_size),
|
||||||
("current_connections", pool_state.connections),
|
("current_connections", pool_state.connections),
|
||||||
@@ -325,7 +398,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key.to_string())
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -333,45 +409,73 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||||
fn push_server_stats(lines: &mut Vec<String>) {
|
fn push_server_stats(lines: &mut Vec<String>) {
|
||||||
let server_stats = get_server_stats();
|
let server_stats = get_server_stats();
|
||||||
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
|
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
|
||||||
for (_, stats) in server_stats {
|
for (_, stats) in server_stats {
|
||||||
server_stats_by_addresses.insert(stats.address_name(), stats);
|
let entry = prom_stats
|
||||||
|
.entry(stats.address_name())
|
||||||
|
.or_insert(ServerPrometheusStats {
|
||||||
|
bytes_received: 0,
|
||||||
|
bytes_sent: 0,
|
||||||
|
transaction_count: 0,
|
||||||
|
query_count: 0,
|
||||||
|
error_count: 0,
|
||||||
|
active_count: 0,
|
||||||
|
idle_count: 0,
|
||||||
|
login_count: 0,
|
||||||
|
tested_count: 0,
|
||||||
|
});
|
||||||
|
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
|
||||||
|
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
|
||||||
|
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
|
||||||
|
entry.query_count += stats.query_count.load(Ordering::Relaxed);
|
||||||
|
entry.error_count += stats.error_count.load(Ordering::Relaxed);
|
||||||
|
match stats.state.load(Ordering::Relaxed) {
|
||||||
|
crate::stats::ServerState::Login => entry.login_count += 1,
|
||||||
|
crate::stats::ServerState::Active => entry.active_count += 1,
|
||||||
|
crate::stats::ServerState::Tested => entry.tested_count += 1,
|
||||||
|
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
let address = pool.address(shard, server);
|
let address = pool.address(shard, server);
|
||||||
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
|
if let Some(server_info) = prom_stats.get(&address.name()) {
|
||||||
let metrics = [
|
let metrics = [
|
||||||
(
|
("bytes_received", server_info.bytes_received),
|
||||||
"bytes_received",
|
("bytes_sent", server_info.bytes_sent),
|
||||||
server_info.bytes_received.load(Ordering::Relaxed),
|
("transaction_count", server_info.transaction_count),
|
||||||
),
|
("query_count", server_info.query_count),
|
||||||
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
|
("error_count", server_info.error_count),
|
||||||
(
|
("idle_count", server_info.idle_count),
|
||||||
"transaction_count",
|
("active_count", server_info.active_count),
|
||||||
server_info.transaction_count.load(Ordering::Relaxed),
|
("login_count", server_info.login_count),
|
||||||
),
|
("tested_count", server_info.tested_count),
|
||||||
(
|
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
|
||||||
"query_count",
|
("is_paused", if pool.paused() { 1 } else { 0 }),
|
||||||
server_info.query_count.load(Ordering::Relaxed),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"error_count",
|
|
||||||
server_info.error_count.load(Ordering::Relaxed),
|
|
||||||
),
|
|
||||||
];
|
];
|
||||||
for (key, value) in metrics {
|
for (key, value) in metrics {
|
||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||||
{
|
{
|
||||||
lines.push(prometheus_metric.to_string());
|
grouped_metrics
|
||||||
|
.entry(key.to_string())
|
||||||
|
.or_default()
|
||||||
|
.push(prometheus_metric);
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -380,6 +484,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (_key, metrics) in grouped_metrics {
|
||||||
|
if !metrics.is_empty() {
|
||||||
|
lines.push(metrics[0].get_header());
|
||||||
|
for metric in metrics {
|
||||||
|
lines.push(metric.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||||
|
|||||||
@@ -1399,6 +1399,19 @@ mod test {
|
|||||||
assert!(!qr.query_parser_enabled());
|
assert!(!qr.query_parser_enabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_query_parser() {
|
||||||
|
QueryRouter::setup();
|
||||||
|
let mut qr = QueryRouter::new();
|
||||||
|
qr.pool_settings.query_parser_read_write_splitting = true;
|
||||||
|
|
||||||
|
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
|
||||||
|
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||||
|
|
||||||
|
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
|
||||||
|
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_update_from_pool_settings() {
|
fn test_update_from_pool_settings() {
|
||||||
QueryRouter::setup();
|
QueryRouter::setup();
|
||||||
|
|||||||
34
start_test_env.sh
Executable file
34
start_test_env.sh
Executable file
@@ -0,0 +1,34 @@
|
|||||||
|
GREEN="\033[0;32m"
|
||||||
|
RED="\033[0;31m"
|
||||||
|
BLUE="\033[0;34m"
|
||||||
|
RESET="\033[0m"
|
||||||
|
|
||||||
|
|
||||||
|
cd tests/docker/
|
||||||
|
docker compose kill main || true
|
||||||
|
docker compose build main
|
||||||
|
docker compose down
|
||||||
|
docker compose up -d
|
||||||
|
# wait for the container to start
|
||||||
|
while ! docker compose exec main ls; do
|
||||||
|
echo "Waiting for test environment to start"
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
echo "==================================="
|
||||||
|
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
||||||
|
docker compose exec --workdir /app main cargo build
|
||||||
|
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
||||||
|
docker compose exec --workdir /app/tests/ruby main bundle install
|
||||||
|
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
||||||
|
echo "Interactive test environment ready"
|
||||||
|
echo "To run integration tests, you can use the following commands:"
|
||||||
|
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||||
|
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||||
|
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||||
|
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||||
|
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||||
|
echo "You can rebuild PgCat from within the container by running"
|
||||||
|
echo -e " ${GREEN}cargo build${RESET}"
|
||||||
|
echo "and then run the tests again"
|
||||||
|
echo "==================================="
|
||||||
|
docker compose exec --workdir /app/tests main bash
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
version: "3"
|
|
||||||
services:
|
services:
|
||||||
pg1:
|
pg1:
|
||||||
image: postgres:14
|
image: postgres:14
|
||||||
@@ -48,6 +47,8 @@ services:
|
|||||||
main:
|
main:
|
||||||
build: .
|
build: .
|
||||||
command: ["bash", "/app/tests/docker/run.sh"]
|
command: ["bash", "/app/tests/docker/run.sh"]
|
||||||
|
environment:
|
||||||
|
- INTERACTIVE_TEST_ENVIRONMENT=true
|
||||||
volumes:
|
volumes:
|
||||||
- ../../:/app/
|
- ../../:/app/
|
||||||
- /app/target/
|
- /app/target/
|
||||||
|
|||||||
@@ -5,6 +5,38 @@ rm /app/*.profraw || true
|
|||||||
rm /app/pgcat.profdata || true
|
rm /app/pgcat.profdata || true
|
||||||
rm -rf /app/cov || true
|
rm -rf /app/cov || true
|
||||||
|
|
||||||
|
# Prepares the interactive test environment
|
||||||
|
#
|
||||||
|
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
||||||
|
ports=(5432 7432 8432 9432 10432)
|
||||||
|
for port in "${ports[@]}"; do
|
||||||
|
is_it_up=0
|
||||||
|
attempts=0
|
||||||
|
while [ $is_it_up -eq 0 ]; do
|
||||||
|
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo "PostgreSQL on port $port is up."
|
||||||
|
is_it_up=1
|
||||||
|
else
|
||||||
|
attempts=$((attempts+1))
|
||||||
|
if [ $attempts -gt 10 ]; then
|
||||||
|
echo "PostgreSQL on port $port is down, giving up."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "PostgreSQL on port $port is down, waiting for it to start."
|
||||||
|
sleep 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
done
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
||||||
|
sleep 100000000000000000
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||||
export RUSTC_BOOTSTRAP=1
|
export RUSTC_BOOTSTRAP=1
|
||||||
export CARGO_INCREMENTAL=0
|
export CARGO_INCREMENTAL=0
|
||||||
|
|||||||
@@ -1,2 +1,3 @@
|
|||||||
|
pytest
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
psutil==5.9.1
|
||||||
71
tests/python/test_auth.py
Normal file
71
tests/python/test_auth.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import utils
|
||||||
|
import signal
|
||||||
|
|
||||||
|
class TestTrustAuth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
config= """
|
||||||
|
[general]
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = 6432
|
||||||
|
admin_username = "admin_user"
|
||||||
|
admin_password = ""
|
||||||
|
admin_auth_type = "trust"
|
||||||
|
|
||||||
|
[pools.sharded_db.users.0]
|
||||||
|
username = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
auth_type = "trust"
|
||||||
|
pool_size = 10
|
||||||
|
min_pool_size = 1
|
||||||
|
pool_mode = "transaction"
|
||||||
|
|
||||||
|
[pools.sharded_db.shards.0]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
]
|
||||||
|
database = "shard0"
|
||||||
|
"""
|
||||||
|
utils.pgcat_generic_start(config)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_admin_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(admin=True)
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_normal_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
class TestMD5Auth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
utils.pgcat_start()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_normal_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_admin_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
@@ -1,84 +1,12 @@
|
|||||||
from typing import Tuple
|
|
||||||
import psycopg2
|
|
||||||
import psutil
|
|
||||||
import os
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
import utils
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
|
||||||
PGCAT_PORT = "6432"
|
|
||||||
|
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if "pgcat" == proc.name():
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep pgcat'):
|
|
||||||
raise Exception("pgcat not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
password = "admin_pass"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def test_normal_db_access():
|
|
||||||
pgcat_start()
|
|
||||||
conn, cur = connect_db(autocommit=False)
|
|
||||||
cur.execute("SELECT 1")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_admin_db_access():
|
|
||||||
conn, cur = connect_db(admin=True)
|
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
|
|
||||||
@@ -86,17 +14,17 @@ def test_shutdown_logic():
|
|||||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and send query (not in transaction)
|
# Create client connection and send query (not in transaction)
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cur.execute("COMMIT;")
|
cur.execute("COMMIT;")
|
||||||
|
|
||||||
# Send sigint to pgcat
|
# Send sigint to pgcat
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||||
@@ -108,18 +36,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -138,24 +66,24 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH SIGINT
|
# HANDLE TRANSACTION WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||||
@@ -165,18 +93,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -194,30 +122,30 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
time_taken = time.perf_counter() - start
|
time_taken = time.perf_counter() - start
|
||||||
if time_taken > 0.1:
|
if time_taken > 0.1:
|
||||||
@@ -227,49 +155,49 @@ def test_shutdown_logic():
|
|||||||
else:
|
else:
|
||||||
raise Exception("Able connect to database during shutdown")
|
raise Exception("Able connect to database during shutdown")
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db(admin=True)
|
conn, cur = utils.connect_db(admin=True)
|
||||||
cur.execute("SHOW DATABASES;")
|
cur.execute("SHOW DATABASES;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception(e)
|
raise Exception(e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
admin_cur.execute("SHOW DATABASES;")
|
admin_cur.execute("SHOW DATABASES;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -277,24 +205,24 @@ def test_shutdown_logic():
|
|||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception("Could not execute admin command:", e)
|
raise Exception("Could not execute admin command:", e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||||
@@ -308,12 +236,7 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint and expected timeout")
|
raise Exception("Server not closed after sigint and expected timeout")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
test_normal_db_access()
|
|
||||||
test_admin_db_access()
|
|
||||||
test_shutdown_logic()
|
|
||||||
110
tests/python/utils.py
Normal file
110
tests/python/utils.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
from typing import Tuple
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import psutil
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
|
||||||
|
def _pgcat_start(config_path: str):
|
||||||
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
os.system(f"./target/debug/pgcat {config_path} &")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
_pgcat_start(config_path='.circleci/pgcat.toml')
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_generic_start(config: str):
|
||||||
|
tmp = tempfile.NamedTemporaryFile()
|
||||||
|
with open(tmp.name, 'w') as f:
|
||||||
|
f.write(config)
|
||||||
|
_pgcat_start(config_path=tmp.name)
|
||||||
|
|
||||||
|
|
||||||
|
def glauth_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if proc.name() == "glauth":
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep glauth'):
|
||||||
|
raise Exception("glauth not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep pgcat'):
|
||||||
|
raise Exception("pgcat not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def connect_db(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
password = "admin_pass"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
def connect_db_trust(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
@@ -24,7 +24,8 @@ GEM
|
|||||||
pg (1.3.2)
|
pg (1.3.2)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
regexp_parser (2.3.1)
|
regexp_parser (2.3.1)
|
||||||
rexml (3.2.5)
|
rexml (3.3.6)
|
||||||
|
strscan
|
||||||
rspec (3.11.0)
|
rspec (3.11.0)
|
||||||
rspec-core (~> 3.11.0)
|
rspec-core (~> 3.11.0)
|
||||||
rspec-expectations (~> 3.11.0)
|
rspec-expectations (~> 3.11.0)
|
||||||
@@ -50,6 +51,7 @@ GEM
|
|||||||
rubocop-ast (1.17.0)
|
rubocop-ast (1.17.0)
|
||||||
parser (>= 3.1.1.0)
|
parser (>= 3.1.1.0)
|
||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
|
strscan (3.1.0)
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
toxiproxy (2.0.1)
|
||||||
|
|||||||
682
tests/rust/Cargo.lock
generated
682
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,16 +15,14 @@ async fn test_prepared_statements() {
|
|||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = tokio::task::spawn(async move {
|
||||||
for _ in 0..1000 {
|
for i in 0..1000 {
|
||||||
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.to_string().contains("prepared statement") {
|
|
||||||
panic!("prepared statement error: {}", err);
|
panic!("prepared statement error: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
handles.push(handle);
|
handles.push(handle);
|
||||||
|
|||||||
Reference in New Issue
Block a user