mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 10:06:28 +00:00
Compare commits
6 Commits
mostafa_de
...
circleci_A
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d404250fb8 | ||
|
|
081b9f74e9 | ||
|
|
d4e8ff27e7 | ||
|
|
7d047c6c19 | ||
|
|
f73d15f82c | ||
|
|
69af6cc5e5 |
@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
|||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
|
kill -9 $(pgrep toxiproxy) || true
|
||||||
LOG_LEVEL=error toxiproxy-server &
|
LOG_LEVEL=error toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
@@ -106,7 +107,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py || exit 1
|
pytest || exit 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
|
|||||||
|
|
||||||
# Allow for graceful shutdown
|
# Allow for graceful shutdown
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
|
kill -9 $(pgrep toxiproxy)
|
||||||
|
sleep 1
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,3 +11,4 @@ dev/.bash_history
|
|||||||
dev/cache
|
dev/cache
|
||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
|
**/__pycache__
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
|
|||||||
|
|
||||||
Once the environment is ready, you can run the tests by running
|
Once the environment is ready, you can run the tests by running
|
||||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||||
Python: `cd /app && python3 tests/python/tests.py`
|
Python: `cd /app/ && pytest`
|
||||||
Rust: `cd /app/tests/rust && cargo run`
|
Rust: `cd /app/tests/rust && cargo run`
|
||||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||||
|
|
||||||
|
|||||||
@@ -346,6 +346,14 @@ where
|
|||||||
// Client is requesting to cancel a running query (plain text connection).
|
// Client is requesting to cancel a running query (plain text connection).
|
||||||
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
||||||
|
|
||||||
|
// Client is requesting a GSS encoded connection
|
||||||
|
GSSENC_REQUEST_CODE => {
|
||||||
|
error_response_terminal(stream, "").await?;
|
||||||
|
Err(Error::ProtocolSyncError(
|
||||||
|
"PGCat does not support GSSAPI encoding".into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
// Something else, probably something is wrong and it's not our fault,
|
// Something else, probably something is wrong and it's not our fault,
|
||||||
// e.g. badly implemented Postgres client.
|
// e.g. badly implemented Postgres client.
|
||||||
_ => Err(Error::ProtocolSyncError(format!(
|
_ => Err(Error::ProtocolSyncError(format!(
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ pub const SSL_REQUEST_CODE: i32 = 80877103;
|
|||||||
// CancelRequest: the cancel request code.
|
// CancelRequest: the cancel request code.
|
||||||
pub const CANCEL_REQUEST_CODE: i32 = 80877102;
|
pub const CANCEL_REQUEST_CODE: i32 = 80877102;
|
||||||
|
|
||||||
|
// GSSENCRequest: used to indicate we wants GSS connection
|
||||||
|
pub const GSSENC_REQUEST_CODE: i32 = 80877104;
|
||||||
|
|
||||||
// AuthenticationMD5Password
|
// AuthenticationMD5Password
|
||||||
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
|
|||||||
echo "Interactive test environment ready"
|
echo "Interactive test environment ready"
|
||||||
echo "To run integration tests, you can use the following commands:"
|
echo "To run integration tests, you can use the following commands:"
|
||||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||||
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
||||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||||
|
|||||||
@@ -2,7 +2,9 @@ FROM rust:bullseye
|
|||||||
|
|
||||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||||
RUN /bin/yj -h
|
RUN /bin/yj -h
|
||||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
|
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 gnupg lsb-release -y
|
||||||
|
RUN env DEBIAN_FRONTEND=noninteractive apt-get -y install krb5-kdc krb5-admin-server krb5-user
|
||||||
|
|
||||||
RUN cargo install cargo-binutils rustfilt
|
RUN cargo install cargo-binutils rustfilt
|
||||||
RUN rustup component add llvm-tools-preview
|
RUN rustup component add llvm-tools-preview
|
||||||
RUN sudo gem install bundler
|
RUN sudo gem install bundler
|
||||||
|
|||||||
0
tests/python/conftest.py
Normal file
0
tests/python/conftest.py
Normal file
@@ -1,2 +1,3 @@
|
|||||||
|
pytest
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
psutil==5.9.1
|
||||||
94
tests/python/test_krb.py
Normal file
94
tests/python/test_krb.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
|
import utils
|
||||||
|
|
||||||
|
REALM = 'EXAMPLE.COM'
|
||||||
|
SUPPORTED_ENCRYPTION_TYPES = 'aes256-cts-hmac-sha1-96:normal'
|
||||||
|
KADMIN_PRINCIPAL = 'root'
|
||||||
|
KADMIN_PASSWORD = 'root'
|
||||||
|
KDC_KADMIN_SERVER = socket.gethostname()
|
||||||
|
|
||||||
|
LOGDIR = 'log'
|
||||||
|
PG_LOG = f'{LOGDIR}/krb.log'
|
||||||
|
# Assumes packages are installed; krb5-kdc and krb5-admin-server on debian
|
||||||
|
KADMIN_PRINCIPAL_FULL = f'{KADMIN_PRINCIPAL}@{REALM}'
|
||||||
|
MASTER_PASSWORD = 'master_password'
|
||||||
|
|
||||||
|
|
||||||
|
def setup_krb():
|
||||||
|
krb5_conf = f"""
|
||||||
|
[libdefaults]
|
||||||
|
default_realm = {REALM}
|
||||||
|
rdns = false
|
||||||
|
|
||||||
|
[realms]
|
||||||
|
{REALM} = {{
|
||||||
|
kdc_ports = 88,750
|
||||||
|
kadmind_port = 749
|
||||||
|
kdc = {KDC_KADMIN_SERVER}
|
||||||
|
admin_server = {KDC_KADMIN_SERVER}
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
with open("/etc/krb5.conf", "w") as text_file:
|
||||||
|
text_file.write(krb5_conf)
|
||||||
|
|
||||||
|
kdc_conf = f"""
|
||||||
|
[realms]
|
||||||
|
{REALM} = {{
|
||||||
|
acl_file = /etc/krb5kdc/kadm5.acl
|
||||||
|
max_renewable_life = 7d 0h 0m 0s
|
||||||
|
supported_enctypes = {SUPPORTED_ENCRYPTION_TYPES}
|
||||||
|
default_principal_flags = +preauth
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
with open("/etc/krb5kdc/kdc.conf", "w") as text_file:
|
||||||
|
text_file.write(kdc_conf)
|
||||||
|
|
||||||
|
kadm5_acl = f"""
|
||||||
|
{KADMIN_PRINCIPAL_FULL} *
|
||||||
|
"""
|
||||||
|
with open("/etc/krb5kdc/kadm5.acl", "w") as text_file:
|
||||||
|
text_file.write(kadm5_acl)
|
||||||
|
|
||||||
|
kerberos_command = f"""
|
||||||
|
krb5_newrealm <<EOF
|
||||||
|
{MASTER_PASSWORD}
|
||||||
|
{MASTER_PASSWORD}
|
||||||
|
EOF
|
||||||
|
"""
|
||||||
|
subprocess.run(kerberos_command, check=False, shell=True)
|
||||||
|
|
||||||
|
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||||
|
subprocess.run(delete_principal, check=True, shell=True)
|
||||||
|
|
||||||
|
create_principal = f'kadmin.local -q "addprinc -pw {KADMIN_PASSWORD} {KADMIN_PRINCIPAL_FULL}"'
|
||||||
|
subprocess.run(create_principal, check=True, shell=True)
|
||||||
|
|
||||||
|
kinit_command = f'echo {KADMIN_PASSWORD} | kinit'
|
||||||
|
subprocess.run(kinit_command, check=True, shell=True)
|
||||||
|
|
||||||
|
utils.pgcat_start()
|
||||||
|
|
||||||
|
|
||||||
|
def teardown_krb():
|
||||||
|
subprocess.run('kdestroy', check=True, shell=True)
|
||||||
|
|
||||||
|
delete_principal = f'kadmin.local -q "delete_principal -force {KADMIN_PRINCIPAL_FULL}"'
|
||||||
|
subprocess.run(delete_principal, check=True, shell=True)
|
||||||
|
|
||||||
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
|
|
||||||
|
def test_krb():
|
||||||
|
setup_krb()
|
||||||
|
# TODO test connect to database
|
||||||
|
|
||||||
|
utils.pgcat_start()
|
||||||
|
conn, cur = utils.connect_db(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
teardown_krb()
|
||||||
@@ -1,83 +1,29 @@
|
|||||||
from typing import Tuple
|
|
||||||
import psycopg2
|
|
||||||
import psutil
|
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
import utils
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
|
||||||
PGCAT_PORT = "6432"
|
|
||||||
|
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if "pgcat" == proc.name():
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep pgcat'):
|
|
||||||
raise Exception("pgcat not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
password = "admin_pass"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def test_normal_db_access():
|
def test_normal_db_access():
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
conn, cur = connect_db(autocommit=False)
|
conn, cur = utils.connect_db(autocommit=False)
|
||||||
cur.execute("SELECT 1")
|
cur.execute("SELECT 1")
|
||||||
res = cur.fetchall()
|
res = cur.fetchall()
|
||||||
print(res)
|
print(res)
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_db_access():
|
def test_admin_db_access():
|
||||||
conn, cur = connect_db(admin=True)
|
conn, cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
cur.execute("SHOW POOLS")
|
||||||
res = cur.fetchall()
|
res = cur.fetchall()
|
||||||
print(res)
|
print(res)
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
@@ -86,17 +32,17 @@ def test_shutdown_logic():
|
|||||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and send query (not in transaction)
|
# Create client connection and send query (not in transaction)
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cur.execute("COMMIT;")
|
cur.execute("COMMIT;")
|
||||||
|
|
||||||
# Send sigint to pgcat
|
# Send sigint to pgcat
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||||
@@ -108,18 +54,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -138,24 +84,24 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH SIGINT
|
# HANDLE TRANSACTION WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||||
@@ -165,18 +111,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -194,30 +140,30 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
time_taken = time.perf_counter() - start
|
time_taken = time.perf_counter() - start
|
||||||
if time_taken > 0.1:
|
if time_taken > 0.1:
|
||||||
@@ -227,49 +173,49 @@ def test_shutdown_logic():
|
|||||||
else:
|
else:
|
||||||
raise Exception("Able connect to database during shutdown")
|
raise Exception("Able connect to database during shutdown")
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn, cur = connect_db(admin=True)
|
conn, cur = utils.connect_db(admin=True)
|
||||||
cur.execute("SHOW DATABASES;")
|
cur.execute("SHOW DATABASES;")
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception(e)
|
raise Exception(e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = connect_db()
|
transaction_conn, transaction_cur = utils.connect_db()
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
admin_conn, admin_cur = connect_db(admin=True)
|
admin_conn, admin_cur = utils.connect_db(admin=True)
|
||||||
admin_cur.execute("SHOW DATABASES;")
|
admin_cur.execute("SHOW DATABASES;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -277,24 +223,24 @@ def test_shutdown_logic():
|
|||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception("Could not execute admin command:", e)
|
raise Exception("Could not execute admin command:", e)
|
||||||
|
|
||||||
cleanup_conn(transaction_conn, transaction_cur)
|
utils.cleanup_conn(transaction_conn, transaction_cur)
|
||||||
cleanup_conn(admin_conn, admin_cur)
|
utils.cleanup_conn(admin_conn, admin_cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
pgcat_start()
|
utils.pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||||
conn, cur = connect_db()
|
conn, cur = utils.connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
pg_cat_send_signal(signal.SIGINT)
|
utils.pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||||
@@ -308,12 +254,5 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint and expected timeout")
|
raise Exception("Server not closed after sigint and expected timeout")
|
||||||
|
|
||||||
cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
|
||||||
|
|
||||||
|
|
||||||
test_normal_db_access()
|
|
||||||
test_admin_db_access()
|
|
||||||
test_shutdown_logic()
|
|
||||||
60
tests/python/utils.py
Normal file
60
tests/python/utils.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
from typing import Tuple
|
||||||
|
import os
|
||||||
|
import psutil
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep pgcat'):
|
||||||
|
raise Exception("pgcat not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def connect_db(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
password = "admin_pass"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
Reference in New Issue
Block a user