mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
5 Commits
circleci_A
...
circleci_p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b24e5395be | ||
|
|
53b6b56563 | ||
|
|
1cc71f3990 | ||
|
|
baa6f661aa | ||
|
|
ae2b83d239 |
@@ -107,7 +107,7 @@ cd ../..
|
||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||
#
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
pytest || exit 1
|
||||
python3 tests/python/tests.py || exit 1
|
||||
|
||||
|
||||
#
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -10,5 +10,4 @@ lcov.info
|
||||
dev/.bash_history
|
||||
dev/cache
|
||||
!dev/cache/.keepme
|
||||
.venv
|
||||
**/__pycache__
|
||||
.venv
|
||||
@@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here:
|
||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||
|
||||
## How to run the integration tests locally and iterate on them
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||
|
||||
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
|
||||
|
||||
Once the environment is ready, you can run the tests by running
|
||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||
Python: `cd /app/ && pytest`
|
||||
Python: `cd /app && python3 tests/python/tests.py`
|
||||
Rust: `cd /app/tests/rust && cargo run`
|
||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||
|
||||
|
||||
@@ -346,14 +346,6 @@ where
|
||||
// Client is requesting to cancel a running query (plain text connection).
|
||||
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
||||
|
||||
// Client is requesting a GSS encoded connection
|
||||
GSSENC_REQUEST_CODE => {
|
||||
error_response_terminal(stream, "").await?;
|
||||
Err(Error::ProtocolSyncError(
|
||||
"PGCat does not support GSSAPI encoding".into(),
|
||||
))
|
||||
}
|
||||
|
||||
// Something else, probably something is wrong and it's not our fault,
|
||||
// e.g. badly implemented Postgres client.
|
||||
_ => Err(Error::ProtocolSyncError(format!(
|
||||
|
||||
@@ -11,9 +11,6 @@ pub const SSL_REQUEST_CODE: i32 = 80877103;
|
||||
// CancelRequest: the cancel request code.
|
||||
pub const CANCEL_REQUEST_CODE: i32 = 80877102;
|
||||
|
||||
// GSSENCRequest: used to indicate we wants GSS connection
|
||||
pub const GSSENC_REQUEST_CODE: i32 = 80877104;
|
||||
|
||||
// AuthenticationMD5Password
|
||||
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
||||
|
||||
|
||||
@@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
|
||||
|
||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let formatted_labels = self
|
||||
.labels
|
||||
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
||||
sorted_labels.sort_by_key(|&(key, _)| key);
|
||||
let formatted_labels = sorted_labels
|
||||
.iter()
|
||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
write!(
|
||||
f,
|
||||
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
||||
"{name}{{{formatted_labels}}} {value}",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
formatted_labels = formatted_labels,
|
||||
value = self.value
|
||||
)
|
||||
@@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("user", address.username.clone());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||
}
|
||||
@@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("user", address.username.clone());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||
}
|
||||
|
||||
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("user", address.username.clone());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||
}
|
||||
@@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
|
||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||
}
|
||||
|
||||
fn get_header(&self) -> String {
|
||||
format!(
|
||||
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_stats(
|
||||
@@ -313,6 +322,7 @@ async fn prometheus_stats(
|
||||
|
||||
// Adds metrics shown in a SHOW STATS admin command.
|
||||
fn push_address_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
@@ -322,7 +332,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -330,33 +343,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
let pool_stats = PoolStats::construct_pool_lookup();
|
||||
for (pool_id, stats) in pool_stats.iter() {
|
||||
for (name, value) in stats.clone() {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(name)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||
fn push_database_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
let pool_config = pool.settings.clone();
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
|
||||
let metrics = vec![
|
||||
("pool_size", pool_config.user.pool_size),
|
||||
("current_connections", pool_state.connections),
|
||||
@@ -365,7 +398,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -373,6 +409,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||
@@ -405,7 +449,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||
}
|
||||
}
|
||||
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
@@ -428,7 +472,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||
{
|
||||
lines.push(prometheus_metric.to_string());
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -437,6 +484,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||
|
||||
@@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
|
||||
echo "Interactive test environment ready"
|
||||
echo "To run integration tests, you can use the following commands:"
|
||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo -e " ${GREEN}cargo build${RESET}"
|
||||
echo "and then run the tests again"
|
||||
echo "==================================="
|
||||
|
||||
@@ -2,9 +2,7 @@ FROM rust:bullseye
|
||||
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 gnupg lsb-release -y
|
||||
RUN env DEBIAN_FRONTEND=noninteractive apt-get -y install krb5-kdc krb5-admin-server krb5-user
|
||||
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
|
||||
RUN cargo install cargo-binutils rustfilt
|
||||
RUN rustup component add llvm-tools-preview
|
||||
RUN sudo gem install bundler
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
pytest
|
||||
psycopg2==2.9.3
|
||||
psutil==5.9.1
|
||||
psutil==5.9.1
|
||||
@@ -1,94 +0,0 @@
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import utils
|
||||
|
||||
REALM = 'EXAMPLE.COM'
|
||||
SUPPORTED_ENCRYPTION_TYPES = 'aes256-cts-hmac-sha1-96:normal'
|
||||
KADMIN_PRINCIPAL = 'root'
|
||||
KADMIN_PASSWORD = 'root'
|
||||
KDC_KADMIN_SERVER = socket.gethostname()
|
||||
|
||||
LOGDIR = 'log'
|
||||
PG_LOG = f'{LOGDIR}/krb.log'
|
||||
# Assumes packages are installed; krb5-kdc and krb5-admin-server on debian
|
||||
KADMIN_PRINCIPAL_FULL = f'{KADMIN_PRINCIPAL}@{REALM}'
|
||||
MASTER_PASSWORD = 'master_password'
|
||||
|
||||
|
||||
def setup_krb():
|
||||
krb5_conf = f"""
|
||||
[libdefaults]
|
||||
default_realm = {REALM}
|
||||
rdns = false
|
||||
|
||||
[realms]
|
||||
{REALM} = {{
|
||||
kdc_ports = 88,750
|
||||
kadmind_port = 749
|
||||
kdc = {KDC_KADMIN_SERVER}
|
||||
admin_server = {KDC_KADMIN_SERVER}
|
||||
}}
|
||||
"""
|
||||
with open("/etc/krb5.conf", "w") as text_file:
|
||||
text_file.write(krb5_conf)
|
||||
|
||||
kdc_conf = f"""
|
||||
[realms]
|
||||
{REALM} = {{
|
||||
acl_file = /etc/krb5kdc/kadm5.acl
|
||||
max_renewable_life = 7d 0h 0m 0s
|
||||
supported_enctypes = {SUPPORTED_ENCRYPTION_TYPES}
|
||||
default_principal_flags = +preauth
|
||||
}}
|
||||
"""
|
||||
with open("/etc/krb5kdc/kdc.conf", "w") as text_file:
|
||||
text_file.write(kdc_conf)
|
||||
|
||||
kadm5_acl = f"""
|
||||
{KADMIN_PRINCIPAL_FULL} *
|
||||
"""
|
||||
with open("/etc/krb5kdc/kadm5.acl", "w") as text_file:
|
||||
text_file.write(kadm5_acl)
|
||||
|
||||
kerberos_command = f"""
|
||||
krb5_newrealm <<EOF
|
||||
{MASTER_PASSWORD}
|
||||
{MASTER_PASSWORD}
|
||||
EOF
|
||||
"""
|
||||
subprocess.run(kerberos_command, check=False, shell=True)
|
||||
|
||||
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(delete_principal, check=True, shell=True)
|
||||
|
||||
create_principal = f'kadmin.local -q "addprinc -pw {KADMIN_PASSWORD} {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(create_principal, check=True, shell=True)
|
||||
|
||||
kinit_command = f'echo {KADMIN_PASSWORD} | kinit'
|
||||
subprocess.run(kinit_command, check=True, shell=True)
|
||||
|
||||
utils.pgcat_start()
|
||||
|
||||
|
||||
def teardown_krb():
|
||||
subprocess.run('kdestroy', check=True, shell=True)
|
||||
|
||||
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(delete_principal, check=True, shell=True)
|
||||
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
|
||||
|
||||
def test_krb():
|
||||
setup_krb()
|
||||
# TODO test connect to database
|
||||
|
||||
utils.pgcat_start()
|
||||
conn, cur = utils.connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
teardown_krb()
|
||||
@@ -1,29 +1,83 @@
|
||||
from typing import Tuple
|
||||
import psycopg2
|
||||
import psutil
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
|
||||
import utils
|
||||
|
||||
SHUTDOWN_TIMEOUT = 5
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_normal_db_access():
|
||||
utils.pgcat_start()
|
||||
conn, cur = utils.connect_db(autocommit=False)
|
||||
pgcat_start()
|
||||
conn, cur = connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_admin_db_access():
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
conn, cur = connect_db(admin=True)
|
||||
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_shutdown_logic():
|
||||
@@ -32,17 +86,17 @@ def test_shutdown_logic():
|
||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and send query (not in transaction)
|
||||
conn, cur = utils.connect_db()
|
||||
conn, cur = connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
cur.execute("COMMIT;")
|
||||
|
||||
# Send sigint to pgcat
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||
@@ -54,18 +108,18 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -84,24 +138,24 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = utils.connect_db()
|
||||
conn, cur = connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||
@@ -111,18 +165,18 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -140,30 +194,30 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
conn, cur = utils.connect_db()
|
||||
conn, cur = connect_db()
|
||||
cur.execute("SELECT 1;")
|
||||
utils.cleanup_conn(conn, cur)
|
||||
cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
time_taken = time.perf_counter() - start
|
||||
if time_taken > 0.1:
|
||||
@@ -173,49 +227,49 @@ def test_shutdown_logic():
|
||||
else:
|
||||
raise Exception("Able connect to database during shutdown")
|
||||
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
conn, cur = connect_db(admin=True)
|
||||
cur.execute("SHOW DATABASES;")
|
||||
utils.cleanup_conn(conn, cur)
|
||||
cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception(e)
|
||||
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
admin_cur.execute("SHOW DATABASES;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
@@ -223,24 +277,24 @@ def test_shutdown_logic():
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception("Could not execute admin command:", e)
|
||||
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
utils.pgcat_start()
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||
conn, cur = utils.connect_db()
|
||||
conn, cur = connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
|
||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||
@@ -254,5 +308,12 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint and expected timeout")
|
||||
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
|
||||
|
||||
test_normal_db_access()
|
||||
test_admin_db_access()
|
||||
test_shutdown_logic()
|
||||
@@ -1,60 +0,0 @@
|
||||
from typing import Tuple
|
||||
import os
|
||||
import psutil
|
||||
import signal
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user