Compare commits

...

10 Commits

Author SHA1 Message Date
Andrew Jackson
d404250fb8 formatted 2024-09-06 09:39:45 -05:00
CommanderKeynes
081b9f74e9 Add tests 2024-09-05 20:59:26 -05:00
CommanderKeynes
d4e8ff27e7 Initial fix 2024-09-05 10:03:29 -05:00
Andrew Jackson
7d047c6c19 Implemented python tests with pytest (#790)
Currently the python tests act as scripts. A lot of output is generated to stdout which makes it very hard to figure out where problems were. Also if you want to run only a single test you basically need to comment out code in order to accomplish this.

This PR modifies the python tests to us the pytest python testing framework. This framework allows individual tests to be targeted via the command line, without touching the source code. It also suppressed stdout by default making the test output much easier to read. Also after the tests run it will provide a summary of what failed, what succeded, etc.


Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
Co-authored-by: Andrew Jackson <andrewjackson2988@gmail.com>
2024-09-05 08:16:45 -05:00
Andrew Jackson
f73d15f82c Fix CI script to allow consecutive runs locally (#793)
Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
2024-09-05 08:01:33 -05:00
Mostafa Abdelraouf
69af6cc5e5 Make iterating on integration tests easier (#789)
Writing and iterating on integration tests are cumbersome, having to wait 10 minutes for the test-suite to run just to see if your test works or not is unacceptable.

In this PR, I added a detailed workflow for writing tests that should shorten the feedback cycle of modifying tests to be as low as a few seconds.

It will involve opening a shell into a long-lived container that has all the setup and dependencies necessary and then running your desired tests directly there. I added a convenience script that bootstraps the environment and then opens an interactive shell into the container and you can then run tests immediately in an environment that is more or less identical to what we have running in CircleCI
2024-09-03 11:15:53 -05:00
Mostafa Abdelraouf
ca34597002 Fix broken integration test #740 (#787) 2024-08-31 17:15:13 -05:00
Mostafa Abdelraouf
2def40ea6a Add test case for issue 776 (#786)
I am adding a tiny test that uses the SQL statement that was reported to break an older version of SQL parser library

#776
2024-08-31 10:52:33 -05:00
Mostafa Abdelraouf
c05129018d Improve Prometheus stats + Add Grafana dashboard (#785)
We were missing some labels on metrics generated by the Prometheus exporter so I fixed that. There are still some gaps that I want to address with respect to the metrics we track but this seems like a good start.

I also created a Grafana Dashboard and exported it to JSON. It is designed with the same metric names the Prometheus exporter uses.
2024-08-31 08:18:57 -05:00
Mostafa Abdelraouf
4a7a6a8e7a Cut 1.2.0 release (#783) 2024-08-30 08:30:16 -05:00
25 changed files with 2997 additions and 420 deletions

View File

@@ -59,6 +59,7 @@ admin_password = "admin_pass"
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
prepared_statements_cache_size = 500
# If the client doesn't specify, route traffic to
# this role by default.
@@ -141,6 +142,7 @@ query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
prepared_statements_cache_size = 500
[pools.simple_db.users.0]
username = "simple_user"

View File

@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server &
sleep 1
@@ -106,7 +107,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1
pytest || exit 1
#
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown
sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

3
.gitignore vendored
View File

@@ -10,4 +10,5 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
.venv
**/__pycache__

View File

@@ -6,6 +6,32 @@ Thank you for contributing! Just a few tips here:
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
Quite simply, make sure you have docker installed and then run
`./start_test_env.sh`
That is it!
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
To rebuild PgCat, just run `cargo build` within the container under `/app`
![Animated gif showing how to run tests](https://github.com/user-attachments/assets/2258fde3-2aed-4efb-bdc5-e4f12dcd4d33)
Happy hacking!
## TODOs

2
Cargo.lock generated
View File

@@ -1034,7 +1034,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.2.0"
dependencies = [
"arc-swap",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -268,6 +268,8 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.

View File

@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
maintainers:
- name: Wildcard
email: support@w6d.io
appVersion: "1.1.1"
version: 0.1.0
appVersion: "1.2.0"
version: 0.2.0

View File

@@ -170,13 +170,13 @@ configuration:
connect_timeout: 5000
# How long an idle connection with a server is left open (ms).
idle_timeout: 30000 # milliseconds
idle_timeout: 30000 # milliseconds
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
server_lifetime: 86400000 # 24 hours
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
idle_client_in_transaction_timeout: 0 # milliseconds
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout: 1000
@@ -240,7 +240,15 @@ configuration:
## the pool_name is what clients use as database name when connecting
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
## @param [object]
pools: []
pools:
[{
name: "simple", pool_mode: "transaction",
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
shards: [{
servers: [{host: "postgres", port: 5432, role: "primary"}],
database: "postgres"
}]
}]
# - ## default values
# ##
# ##

2124
grafana_dashboard.json Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -346,6 +346,14 @@ where
// Client is requesting to cancel a running query (plain text connection).
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
// Client is requesting a GSS encoded connection
GSSENC_REQUEST_CODE => {
error_response_terminal(stream, "").await?;
Err(Error::ProtocolSyncError(
"PGCat does not support GSSAPI encoding".into(),
))
}
// Something else, probably something is wrong and it's not our fault,
// e.g. badly implemented Postgres client.
_ => Err(Error::ProtocolSyncError(format!(

View File

@@ -11,6 +11,9 @@ pub const SSL_REQUEST_CODE: i32 = 80877103;
// CancelRequest: the cancel request code.
pub const CANCEL_REQUEST_CODE: i32 = 80877102;
// GSSENCRequest: used to indicate we wants GSS connection
pub const GSSENC_REQUEST_CODE: i32 = 80877104;
// AuthenticationMD5Password
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;

View File

@@ -12,19 +12,30 @@ use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::get_server_stats;
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
ty: &'static str,
}
struct ServerPrometheusStats {
bytes_received: u64,
bytes_sent: u64,
transaction_count: u64,
query_count: u64,
error_count: u64,
active_count: u64,
idle_count: u64,
login_count: u64,
tested_count: u64,
}
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
// counters only increase
// gauges can arbitrarily increase or decrease
@@ -127,22 +138,46 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
},
"servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server",
ty: "gauge",
ty: "counter",
},
"servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server",
ty: "gauge",
ty: "counter",
},
"servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server",
ty: "gauge",
ty: "counter",
},
"servers_query_count" => MetricHelpType {
help: "Number of queries executed by server",
ty: "gauge",
ty: "counter",
},
"servers_error_count" => MetricHelpType {
help: "Number of errors",
ty: "counter",
},
"servers_idle_count" => MetricHelpType {
help: "Number of server connection in idle state",
ty: "gauge",
},
"servers_active_count" => MetricHelpType {
help: "Number of server connection in active state",
ty: "gauge",
},
"servers_tested_count" => MetricHelpType {
help: "Number of server connection in tested state",
ty: "gauge",
},
"servers_login_count" => MetricHelpType {
help: "Number of server connection in login state",
ty: "gauge",
},
"servers_is_banned" => MetricHelpType {
help: "0 if server is not banned, 1 if server is banned",
ty: "gauge",
},
"servers_is_paused" => MetricHelpType {
help: "0 if server is not paused, 1 if server is paused",
ty: "gauge",
},
"databases_pool_size" => MetricHelpType {
@@ -210,7 +245,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -225,8 +262,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -236,7 +274,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -338,34 +378,51 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats();
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
for (_, stats) in server_stats {
server_stats_by_addresses.insert(stats.address_name(), stats);
let entry = prom_stats
.entry(stats.address_name())
.or_insert(ServerPrometheusStats {
bytes_received: 0,
bytes_sent: 0,
transaction_count: 0,
query_count: 0,
error_count: 0,
active_count: 0,
idle_count: 0,
login_count: 0,
tested_count: 0,
});
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
entry.query_count += stats.query_count.load(Ordering::Relaxed);
entry.error_count += stats.error_count.load(Ordering::Relaxed);
match stats.state.load(Ordering::Relaxed) {
crate::stats::ServerState::Login => entry.login_count += 1,
crate::stats::ServerState::Active => entry.active_count += 1,
crate::stats::ServerState::Tested => entry.tested_count += 1,
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
if let Some(server_info) = prom_stats.get(&address.name()) {
let metrics = [
(
"bytes_received",
server_info.bytes_received.load(Ordering::Relaxed),
),
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
(
"transaction_count",
server_info.transaction_count.load(Ordering::Relaxed),
),
(
"query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
("bytes_received", server_info.bytes_received),
("bytes_sent", server_info.bytes_sent),
("transaction_count", server_info.transaction_count),
("query_count", server_info.query_count),
("error_count", server_info.error_count),
("idle_count", server_info.idle_count),
("active_count", server_info.active_count),
("login_count", server_info.login_count),
("tested_count", server_info.tested_count),
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
("is_paused", if pool.paused() { 1 } else { 0 }),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =

View File

@@ -1399,6 +1399,19 @@ mod test {
assert!(!qr.query_parser_enabled());
}
#[test]
fn test_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
}
#[test]
fn test_update_from_pool_settings() {
QueryRouter::setup();

34
start_test_env.sh Executable file
View File

@@ -0,0 +1,34 @@
GREEN="\033[0;32m"
RED="\033[0;31m"
BLUE="\033[0;34m"
RESET="\033[0m"
cd tests/docker/
docker compose kill main || true
docker compose build main
docker compose down
docker compose up -d
# wait for the container to start
while ! docker compose exec main ls; do
echo "Waiting for test environment to start"
sleep 1
done
echo "==================================="
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
docker compose exec --workdir /app main cargo build
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
docker compose exec --workdir /app/tests/ruby main bundle install
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again"
echo "==================================="
docker compose exec --workdir /app/tests main bash

View File

@@ -2,7 +2,9 @@ FROM rust:bullseye
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 gnupg lsb-release -y
RUN env DEBIAN_FRONTEND=noninteractive apt-get -y install krb5-kdc krb5-admin-server krb5-user
RUN cargo install cargo-binutils rustfilt
RUN rustup component add llvm-tools-preview
RUN sudo gem install bundler

View File

@@ -1,4 +1,3 @@
version: "3"
services:
pg1:
image: postgres:14
@@ -48,6 +47,8 @@ services:
main:
build: .
command: ["bash", "/app/tests/docker/run.sh"]
environment:
- INTERACTIVE_TEST_ENVIRONMENT=true
volumes:
- ../../:/app/
- /app/target/

View File

@@ -5,6 +5,38 @@ rm /app/*.profraw || true
rm /app/pgcat.profdata || true
rm -rf /app/cov || true
# Prepares the interactive test environment
#
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
ports=(5432 7432 8432 9432 10432)
for port in "${ports[@]}"; do
is_it_up=0
attempts=0
while [ $is_it_up -eq 0 ]; do
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "PostgreSQL on port $port is up."
is_it_up=1
else
attempts=$((attempts+1))
if [ $attempts -gt 10 ]; then
echo "PostgreSQL on port $port is down, giving up."
exit 1
fi
echo "PostgreSQL on port $port is down, waiting for it to start."
sleep 1
fi
done
done
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
sleep 100000000000000000
exit 0
fi
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
export RUSTC_BOOTSTRAP=1
export CARGO_INCREMENTAL=0

0
tests/python/conftest.py Normal file
View File

View File

@@ -1,2 +1,3 @@
pytest
psycopg2==2.9.3
psutil==5.9.1
psutil==5.9.1

94
tests/python/test_krb.py Normal file
View File

@@ -0,0 +1,94 @@
import signal
import socket
import subprocess
import utils
REALM = 'EXAMPLE.COM'
SUPPORTED_ENCRYPTION_TYPES = 'aes256-cts-hmac-sha1-96:normal'
KADMIN_PRINCIPAL = 'root'
KADMIN_PASSWORD = 'root'
KDC_KADMIN_SERVER = socket.gethostname()
LOGDIR = 'log'
PG_LOG = f'{LOGDIR}/krb.log'
# Assumes packages are installed; krb5-kdc and krb5-admin-server on debian
KADMIN_PRINCIPAL_FULL = f'{KADMIN_PRINCIPAL}@{REALM}'
MASTER_PASSWORD = 'master_password'
def setup_krb():
krb5_conf = f"""
[libdefaults]
default_realm = {REALM}
rdns = false
[realms]
{REALM} = {{
kdc_ports = 88,750
kadmind_port = 749
kdc = {KDC_KADMIN_SERVER}
admin_server = {KDC_KADMIN_SERVER}
}}
"""
with open("/etc/krb5.conf", "w") as text_file:
text_file.write(krb5_conf)
kdc_conf = f"""
[realms]
{REALM} = {{
acl_file = /etc/krb5kdc/kadm5.acl
max_renewable_life = 7d 0h 0m 0s
supported_enctypes = {SUPPORTED_ENCRYPTION_TYPES}
default_principal_flags = +preauth
}}
"""
with open("/etc/krb5kdc/kdc.conf", "w") as text_file:
text_file.write(kdc_conf)
kadm5_acl = f"""
{KADMIN_PRINCIPAL_FULL} *
"""
with open("/etc/krb5kdc/kadm5.acl", "w") as text_file:
text_file.write(kadm5_acl)
kerberos_command = f"""
krb5_newrealm <<EOF
{MASTER_PASSWORD}
{MASTER_PASSWORD}
EOF
"""
subprocess.run(kerberos_command, check=False, shell=True)
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
subprocess.run(delete_principal, check=True, shell=True)
create_principal = f'kadmin.local -q "addprinc -pw {KADMIN_PASSWORD} {KADMIN_PRINCIPAL_FULL}"'
subprocess.run(create_principal, check=True, shell=True)
kinit_command = f'echo {KADMIN_PASSWORD} | kinit'
subprocess.run(kinit_command, check=True, shell=True)
utils.pgcat_start()
def teardown_krb():
subprocess.run('kdestroy', check=True, shell=True)
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
subprocess.run(delete_principal, check=True, shell=True)
utils.pg_cat_send_signal(signal.SIGINT)
def test_krb():
setup_krb()
# TODO test connect to database
utils.pgcat_start()
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
teardown_krb()

View File

@@ -1,83 +1,29 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal
import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
utils.pgcat_start()
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
utils.cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = connect_db(admin=True)
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
utils.cleanup_conn(conn, cur)
def test_shutdown_logic():
@@ -86,17 +32,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and send query (not in transaction)
conn, cur = connect_db()
conn, cur = utils.connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
cur.execute("COMMIT;")
# Send sigint to pgcat
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions
@@ -108,18 +54,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -138,24 +84,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
conn, cur = connect_db()
conn, cur = utils.connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -165,18 +111,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -194,30 +140,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()
transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
start = time.perf_counter()
try:
conn, cur = connect_db()
conn, cur = utils.connect_db()
cur.execute("SELECT 1;")
cleanup_conn(conn, cur)
utils.cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
@@ -227,49 +173,49 @@ def test_shutdown_logic():
else:
raise Exception("Able connect to database during shutdown")
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()
transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
conn, cur = connect_db(admin=True)
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW DATABASES;")
cleanup_conn(conn, cur)
utils.cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = connect_db()
transaction_conn, transaction_cur = utils.connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = connect_db(admin=True)
admin_conn, admin_cur = utils.connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
@@ -277,24 +223,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)
cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat
pgcat_start()
utils.pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = connect_db()
conn, cur = utils.connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
pg_cat_send_signal(signal.SIGINT)
utils.pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -308,12 +254,5 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout")
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)

60
tests/python/utils.py Normal file
View File

@@ -0,0 +1,60 @@
from typing import Tuple
import os
import psutil
import signal
import time
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()

682
tests/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,13 +15,11 @@ async fn test_prepared_statements() {
for _ in 0..5 {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
for i in 0..1000 {
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
panic!("prepared statement error: {}", err);
}
}
}