mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
* Adds SHUTDOWN command to PgCat as alternate option to sending SIGINT * Check if we're already in SHUTDOWN sequence * Send signal directly from shutdown instead of using channel * Add tests * trigger build * Lowercase response and boolean change * Update tests * Fix tests * typo
319 lines
8.7 KiB
Python
319 lines
8.7 KiB
Python
from typing import Tuple
|
|
import psycopg2
|
|
import psutil
|
|
import os
|
|
import signal
|
|
import time
|
|
|
|
SHUTDOWN_TIMEOUT = 5
|
|
|
|
PGCAT_HOST = "127.0.0.1"
|
|
PGCAT_PORT = "6432"
|
|
|
|
|
|
def pgcat_start():
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
|
time.sleep(2)
|
|
|
|
|
|
def pg_cat_send_signal(signal: signal.Signals):
|
|
try:
|
|
for proc in psutil.process_iter(["pid", "name"]):
|
|
if "pgcat" == proc.name():
|
|
os.kill(proc.pid, signal)
|
|
except Exception as e:
|
|
# The process can be gone when we send this signal
|
|
print(e)
|
|
|
|
if signal == signal.SIGTERM:
|
|
# Returns 0 if pgcat process exists
|
|
time.sleep(2)
|
|
if not os.system('pgrep pgcat'):
|
|
raise Exception("pgcat not closed after SIGTERM")
|
|
|
|
|
|
def connect_db(
|
|
autocommit: bool = True,
|
|
admin: bool = False,
|
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
|
|
if admin:
|
|
user = "admin_user"
|
|
password = "admin_pass"
|
|
db = "pgcat"
|
|
else:
|
|
user = "sharding_user"
|
|
password = "sharding_user"
|
|
db = "sharded_db"
|
|
|
|
conn = psycopg2.connect(
|
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
connect_timeout=2,
|
|
)
|
|
conn.autocommit = autocommit
|
|
cur = conn.cursor()
|
|
|
|
return (conn, cur)
|
|
|
|
|
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def test_normal_db_access():
|
|
conn, cur = connect_db(autocommit=False)
|
|
cur.execute("SELECT 1")
|
|
res = cur.fetchall()
|
|
print(res)
|
|
cleanup_conn(conn, cur)
|
|
|
|
|
|
def test_admin_db_access():
|
|
conn, cur = connect_db(admin=True)
|
|
|
|
cur.execute("SHOW POOLS")
|
|
res = cur.fetchall()
|
|
print(res)
|
|
cleanup_conn(conn, cur)
|
|
|
|
|
|
def test_shutdown_logic():
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
|
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and send query (not in transaction)
|
|
conn, cur = connect_db()
|
|
|
|
cur.execute("BEGIN;")
|
|
cur.execute("SELECT 1;")
|
|
cur.execute("COMMIT;")
|
|
|
|
# Send sigint to pgcat
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
time.sleep(1)
|
|
|
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
|
try:
|
|
cur.execute("SELECT 1;")
|
|
except psycopg2.OperationalError as e:
|
|
pass
|
|
else:
|
|
# Fail if query execution succeeded
|
|
raise Exception("Server not closed after sigint")
|
|
|
|
cleanup_conn(conn, cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
|
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
conn, cur = connect_db()
|
|
admin_conn, admin_cur = connect_db(admin=True)
|
|
|
|
cur.execute("BEGIN;")
|
|
cur.execute("SELECT 1;")
|
|
cur.execute("COMMIT;")
|
|
|
|
# Send SHUTDOWN command pgcat while not in transaction
|
|
admin_cur.execute("SHUTDOWN;")
|
|
time.sleep(1)
|
|
|
|
# Check that any new queries fail after SHUTDOWN command since server should close with no active transactions
|
|
try:
|
|
cur.execute("SELECT 1;")
|
|
except psycopg2.OperationalError as e:
|
|
pass
|
|
else:
|
|
# Fail if query execution succeeded
|
|
raise Exception("Server not closed after sigint")
|
|
|
|
cleanup_conn(conn, cur)
|
|
cleanup_conn(admin_conn, admin_cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# HANDLE TRANSACTION WITH SIGINT
|
|
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
conn, cur = connect_db()
|
|
|
|
cur.execute("BEGIN;")
|
|
cur.execute("SELECT 1;")
|
|
|
|
# Send sigint to pgcat while still in transaction
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
time.sleep(1)
|
|
|
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
|
try:
|
|
cur.execute("SELECT 1;")
|
|
except psycopg2.OperationalError as e:
|
|
# Fail if query fails since server closed
|
|
raise Exception("Server closed while in transaction", e.pgerror)
|
|
|
|
cleanup_conn(conn, cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
|
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
conn, cur = connect_db()
|
|
admin_conn, admin_cur = connect_db(admin=True)
|
|
|
|
cur.execute("BEGIN;")
|
|
cur.execute("SELECT 1;")
|
|
|
|
# Send SHUTDOWN command pgcat while still in transaction
|
|
admin_cur.execute("SHUTDOWN;")
|
|
if admin_cur.fetchall()[0][0] != "t":
|
|
raise Exception("PgCat unable to send signal")
|
|
time.sleep(1)
|
|
|
|
# Check that any new queries succeed after SHUTDOWN command since server should still allow transaction to complete
|
|
try:
|
|
cur.execute("SELECT 1;")
|
|
except psycopg2.OperationalError as e:
|
|
# Fail if query fails since server closed
|
|
raise Exception("Server closed while in transaction", e.pgerror)
|
|
|
|
cleanup_conn(conn, cur)
|
|
cleanup_conn(admin_conn, admin_cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
transaction_conn, transaction_cur = connect_db()
|
|
|
|
transaction_cur.execute("BEGIN;")
|
|
transaction_cur.execute("SELECT 1;")
|
|
|
|
# Send sigint to pgcat while still in transaction
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
time.sleep(1)
|
|
|
|
start = time.perf_counter()
|
|
try:
|
|
conn, cur = connect_db()
|
|
cur.execute("SELECT 1;")
|
|
cleanup_conn(conn, cur)
|
|
except psycopg2.OperationalError as e:
|
|
time_taken = time.perf_counter() - start
|
|
if time_taken > 0.1:
|
|
raise Exception(
|
|
"Failed to reject connection within 0.1 seconds, got", time_taken, "seconds")
|
|
pass
|
|
else:
|
|
raise Exception("Able connect to database during shutdown")
|
|
|
|
cleanup_conn(transaction_conn, transaction_cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
transaction_conn, transaction_cur = connect_db()
|
|
|
|
transaction_cur.execute("BEGIN;")
|
|
transaction_cur.execute("SELECT 1;")
|
|
|
|
# Send sigint to pgcat while still in transaction
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
time.sleep(1)
|
|
|
|
try:
|
|
conn, cur = connect_db(admin=True)
|
|
cur.execute("SHOW DATABASES;")
|
|
cleanup_conn(conn, cur)
|
|
except psycopg2.OperationalError as e:
|
|
raise Exception(e)
|
|
|
|
cleanup_conn(transaction_conn, transaction_cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction
|
|
transaction_conn, transaction_cur = connect_db()
|
|
transaction_cur.execute("BEGIN;")
|
|
transaction_cur.execute("SELECT 1;")
|
|
|
|
admin_conn, admin_cur = connect_db(admin=True)
|
|
admin_cur.execute("SHOW DATABASES;")
|
|
|
|
# Send sigint to pgcat while still in transaction
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
time.sleep(1)
|
|
|
|
try:
|
|
admin_cur.execute("SHOW DATABASES;")
|
|
except psycopg2.OperationalError as e:
|
|
raise Exception("Could not execute admin command:", e)
|
|
|
|
cleanup_conn(transaction_conn, transaction_cur)
|
|
cleanup_conn(admin_conn, admin_cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
|
|
|
# Start pgcat
|
|
pgcat_start()
|
|
|
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
|
conn, cur = connect_db()
|
|
|
|
cur.execute("BEGIN;")
|
|
cur.execute("SELECT 1;")
|
|
|
|
# Send sigint to pgcat while still in transaction
|
|
pg_cat_send_signal(signal.SIGINT)
|
|
|
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
|
|
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
|
try:
|
|
cur.execute("SELECT 1;")
|
|
except psycopg2.OperationalError as e:
|
|
pass
|
|
else:
|
|
# Fail if query execution succeeded
|
|
raise Exception("Server not closed after sigint and expected timeout")
|
|
|
|
cleanup_conn(conn, cur)
|
|
pg_cat_send_signal(signal.SIGTERM)
|
|
|
|
# - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
test_normal_db_access()
|
|
test_admin_db_access()
|
|
test_shutdown_logic()
|