mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
5 Commits
circleci_A
...
circleci_A
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d404250fb8 | ||
|
|
081b9f74e9 | ||
|
|
d4e8ff27e7 | ||
|
|
7d047c6c19 | ||
|
|
f73d15f82c |
@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||
|
||||
# Start Toxiproxy
|
||||
kill -9 $(pgrep toxiproxy) || true
|
||||
LOG_LEVEL=error toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
@@ -106,7 +107,7 @@ cd ../..
|
||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||
#
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
python3 tests/python/tests.py || exit 1
|
||||
pytest || exit 1
|
||||
|
||||
|
||||
#
|
||||
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
|
||||
|
||||
# Allow for graceful shutdown
|
||||
sleep 1
|
||||
|
||||
kill -9 $(pgrep toxiproxy)
|
||||
sleep 1
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -10,4 +10,5 @@ lcov.info
|
||||
dev/.bash_history
|
||||
dev/cache
|
||||
!dev/cache/.keepme
|
||||
.venv
|
||||
.venv
|
||||
**/__pycache__
|
||||
|
||||
@@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here:
|
||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||
|
||||
## How to run the integration tests locally and iterate on them
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
We have integration tests written in Ruby, Python, Go and Rust.
|
||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
||||
|
||||
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
|
||||
|
||||
Once the environment is ready, you can run the tests by running
|
||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||
Python: `cd /app && python3 tests/python/tests.py`
|
||||
Python: `cd /app/ && pytest`
|
||||
Rust: `cd /app/tests/rust && cargo run`
|
||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||
|
||||
|
||||
@@ -346,6 +346,14 @@ where
|
||||
// Client is requesting to cancel a running query (plain text connection).
|
||||
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
||||
|
||||
// Client is requesting a GSS encoded connection
|
||||
GSSENC_REQUEST_CODE => {
|
||||
error_response_terminal(stream, "").await?;
|
||||
Err(Error::ProtocolSyncError(
|
||||
"PGCat does not support GSSAPI encoding".into(),
|
||||
))
|
||||
}
|
||||
|
||||
// Something else, probably something is wrong and it's not our fault,
|
||||
// e.g. badly implemented Postgres client.
|
||||
_ => Err(Error::ProtocolSyncError(format!(
|
||||
|
||||
@@ -11,6 +11,9 @@ pub const SSL_REQUEST_CODE: i32 = 80877103;
|
||||
// CancelRequest: the cancel request code.
|
||||
pub const CANCEL_REQUEST_CODE: i32 = 80877102;
|
||||
|
||||
// GSSENCRequest: used to indicate we wants GSS connection
|
||||
pub const GSSENC_REQUEST_CODE: i32 = 80877104;
|
||||
|
||||
// AuthenticationMD5Password
|
||||
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
||||
|
||||
|
||||
@@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
|
||||
echo "Interactive test environment ready"
|
||||
echo "To run integration tests, you can use the following commands:"
|
||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
||||
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo "You can rebuild PgCat from within the container by running"
|
||||
echo -e " ${GREEN}cargo build${RESET}"
|
||||
echo "and then run the tests again"
|
||||
echo "==================================="
|
||||
|
||||
@@ -2,7 +2,9 @@ FROM rust:bullseye
|
||||
|
||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||
RUN /bin/yj -h
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 gnupg lsb-release -y
|
||||
RUN env DEBIAN_FRONTEND=noninteractive apt-get -y install krb5-kdc krb5-admin-server krb5-user
|
||||
|
||||
RUN cargo install cargo-binutils rustfilt
|
||||
RUN rustup component add llvm-tools-preview
|
||||
RUN sudo gem install bundler
|
||||
|
||||
0
tests/python/conftest.py
Normal file
0
tests/python/conftest.py
Normal file
@@ -1,2 +1,3 @@
|
||||
pytest
|
||||
psycopg2==2.9.3
|
||||
psutil==5.9.1
|
||||
psutil==5.9.1
|
||||
|
||||
94
tests/python/test_krb.py
Normal file
94
tests/python/test_krb.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import utils
|
||||
|
||||
REALM = 'EXAMPLE.COM'
|
||||
SUPPORTED_ENCRYPTION_TYPES = 'aes256-cts-hmac-sha1-96:normal'
|
||||
KADMIN_PRINCIPAL = 'root'
|
||||
KADMIN_PASSWORD = 'root'
|
||||
KDC_KADMIN_SERVER = socket.gethostname()
|
||||
|
||||
LOGDIR = 'log'
|
||||
PG_LOG = f'{LOGDIR}/krb.log'
|
||||
# Assumes packages are installed; krb5-kdc and krb5-admin-server on debian
|
||||
KADMIN_PRINCIPAL_FULL = f'{KADMIN_PRINCIPAL}@{REALM}'
|
||||
MASTER_PASSWORD = 'master_password'
|
||||
|
||||
|
||||
def setup_krb():
|
||||
krb5_conf = f"""
|
||||
[libdefaults]
|
||||
default_realm = {REALM}
|
||||
rdns = false
|
||||
|
||||
[realms]
|
||||
{REALM} = {{
|
||||
kdc_ports = 88,750
|
||||
kadmind_port = 749
|
||||
kdc = {KDC_KADMIN_SERVER}
|
||||
admin_server = {KDC_KADMIN_SERVER}
|
||||
}}
|
||||
"""
|
||||
with open("/etc/krb5.conf", "w") as text_file:
|
||||
text_file.write(krb5_conf)
|
||||
|
||||
kdc_conf = f"""
|
||||
[realms]
|
||||
{REALM} = {{
|
||||
acl_file = /etc/krb5kdc/kadm5.acl
|
||||
max_renewable_life = 7d 0h 0m 0s
|
||||
supported_enctypes = {SUPPORTED_ENCRYPTION_TYPES}
|
||||
default_principal_flags = +preauth
|
||||
}}
|
||||
"""
|
||||
with open("/etc/krb5kdc/kdc.conf", "w") as text_file:
|
||||
text_file.write(kdc_conf)
|
||||
|
||||
kadm5_acl = f"""
|
||||
{KADMIN_PRINCIPAL_FULL} *
|
||||
"""
|
||||
with open("/etc/krb5kdc/kadm5.acl", "w") as text_file:
|
||||
text_file.write(kadm5_acl)
|
||||
|
||||
kerberos_command = f"""
|
||||
krb5_newrealm <<EOF
|
||||
{MASTER_PASSWORD}
|
||||
{MASTER_PASSWORD}
|
||||
EOF
|
||||
"""
|
||||
subprocess.run(kerberos_command, check=False, shell=True)
|
||||
|
||||
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(delete_principal, check=True, shell=True)
|
||||
|
||||
create_principal = f'kadmin.local -q "addprinc -pw {KADMIN_PASSWORD} {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(create_principal, check=True, shell=True)
|
||||
|
||||
kinit_command = f'echo {KADMIN_PASSWORD} | kinit'
|
||||
subprocess.run(kinit_command, check=True, shell=True)
|
||||
|
||||
utils.pgcat_start()
|
||||
|
||||
|
||||
def teardown_krb():
|
||||
subprocess.run('kdestroy', check=True, shell=True)
|
||||
|
||||
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||
subprocess.run(delete_principal, check=True, shell=True)
|
||||
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
|
||||
|
||||
def test_krb():
|
||||
setup_krb()
|
||||
# TODO test connect to database
|
||||
|
||||
utils.pgcat_start()
|
||||
conn, cur = utils.connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
teardown_krb()
|
||||
@@ -1,83 +1,29 @@
|
||||
from typing import Tuple
|
||||
import psycopg2
|
||||
import psutil
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
|
||||
import utils
|
||||
|
||||
SHUTDOWN_TIMEOUT = 5
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_normal_db_access():
|
||||
pgcat_start()
|
||||
conn, cur = connect_db(autocommit=False)
|
||||
utils.pgcat_start()
|
||||
conn, cur = utils.connect_db(autocommit=False)
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_admin_db_access():
|
||||
conn, cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
|
||||
|
||||
def test_shutdown_logic():
|
||||
@@ -86,17 +32,17 @@ def test_shutdown_logic():
|
||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and send query (not in transaction)
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
cur.execute("COMMIT;")
|
||||
|
||||
# Send sigint to pgcat
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||
@@ -108,18 +54,18 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -138,24 +84,24 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||
@@ -165,18 +111,18 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db()
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -194,30 +140,30 @@ def test_shutdown_logic():
|
||||
# Fail if query fails since server closed
|
||||
raise Exception("Server closed while in transaction", e.pgerror)
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
cur.execute("SELECT 1;")
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
time_taken = time.perf_counter() - start
|
||||
if time_taken > 0.1:
|
||||
@@ -227,49 +173,49 @@ def test_shutdown_logic():
|
||||
else:
|
||||
raise Exception("Able connect to database during shutdown")
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
conn, cur = connect_db(admin=True)
|
||||
conn, cur = utils.connect_db(admin=True)
|
||||
cur.execute("SHOW DATABASES;")
|
||||
cleanup_conn(conn, cur)
|
||||
utils.cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception(e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_conn, transaction_cur = utils.connect_db()
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||
admin_cur.execute("SHOW DATABASES;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
@@ -277,24 +223,24 @@ def test_shutdown_logic():
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception("Could not execute admin command:", e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||
utils.cleanup_conn(admin_conn, admin_cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
utils.pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||
conn, cur = connect_db()
|
||||
conn, cur = utils.connect_db()
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
utils.pg_cat_send_signal(signal.SIGINT)
|
||||
|
||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||
@@ -308,12 +254,5 @@ def test_shutdown_logic():
|
||||
# Fail if query execution succeeded
|
||||
raise Exception("Server not closed after sigint and expected timeout")
|
||||
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
|
||||
|
||||
test_normal_db_access()
|
||||
test_admin_db_access()
|
||||
test_shutdown_logic()
|
||||
utils.cleanup_conn(conn, cur)
|
||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||
60
tests/python/utils.py
Normal file
60
tests/python/utils.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from typing import Tuple
|
||||
import os
|
||||
import psutil
|
||||
import signal
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
|
||||
PGCAT_HOST = "127.0.0.1"
|
||||
PGCAT_PORT = "6432"
|
||||
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
if not os.system('pgrep pgcat'):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
|
||||
return (conn, cur)
|
||||
|
||||
|
||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||
cur.close()
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user