diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 41aae91..a93b484 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -107,7 +107,7 @@ cd ../.. # These tests will start and stop the pgcat server so it will need to be restarted after the tests # pip3 install -r tests/python/requirements.txt -python3 tests/python/tests.py || exit 1 +pytest || exit 1 # diff --git a/.gitignore b/.gitignore index 40d5de1..ae13481 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ lcov.info dev/.bash_history dev/cache !dev/cache/.keepme -.venv \ No newline at end of file +.venv +**/__pycache__ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 98c0006..b946801 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here: 3. Performance is important, make sure there are no regressions in your branch vs. `main`. ## How to run the integration tests locally and iterate on them -We have integration tests written in Ruby, Python, Go and Rust. +We have integration tests written in Ruby, Python, Go and Rust. Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround. Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell. @@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re Once the environment is ready, you can run the tests by running Ruby: `cd /app/tests/ruby && bundle exec ruby .rb --format documentation` -Python: `cd /app && python3 tests/python/tests.py` +Python: `cd /app/ && pytest` Rust: `cd /app/tests/rust && cargo run` Go: `cd /app/tests/go && /usr/local/go/bin/go test` diff --git a/start_test_env.sh b/start_test_env.sh index 24a6b3f..c7ec4b1 100755 --- a/start_test_env.sh +++ b/start_test_env.sh @@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement echo "Interactive test environment ready" echo "To run integration tests, you can use the following commands:" echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}" -echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}" +echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}" echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}" echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}" echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again" -echo "You can rebuild PgCat from within the container by running" +echo "You can rebuild PgCat from within the container by running" echo -e " ${GREEN}cargo build${RESET}" echo "and then run the tests again" echo "===================================" diff --git a/tests/python/conftest.py b/tests/python/conftest.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index eebd9c9..ec8b08f 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -1,2 +1,3 @@ +pytest psycopg2==2.9.3 -psutil==5.9.1 \ No newline at end of file +psutil==5.9.1 diff --git a/tests/python/tests.py b/tests/python/test_pgcat.py similarity index 61% rename from tests/python/tests.py rename to tests/python/test_pgcat.py index cd54081..dc2f11e 100644 --- a/tests/python/tests.py +++ b/tests/python/test_pgcat.py @@ -1,83 +1,29 @@ -from typing import Tuple -import psycopg2 -import psutil import os import signal import time +import psycopg2 + +import utils + SHUTDOWN_TIMEOUT = 5 -PGCAT_HOST = "127.0.0.1" -PGCAT_PORT = "6432" - - -def pgcat_start(): - pg_cat_send_signal(signal.SIGTERM) - os.system("./target/debug/pgcat .circleci/pgcat.toml &") - time.sleep(2) - - -def pg_cat_send_signal(signal: signal.Signals): - try: - for proc in psutil.process_iter(["pid", "name"]): - if "pgcat" == proc.name(): - os.kill(proc.pid, signal) - except Exception as e: - # The process can be gone when we send this signal - print(e) - - if signal == signal.SIGTERM: - # Returns 0 if pgcat process exists - time.sleep(2) - if not os.system('pgrep pgcat'): - raise Exception("pgcat not closed after SIGTERM") - - -def connect_db( - autocommit: bool = True, - admin: bool = False, -) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]: - - if admin: - user = "admin_user" - password = "admin_pass" - db = "pgcat" - else: - user = "sharding_user" - password = "sharding_user" - db = "sharded_db" - - conn = psycopg2.connect( - f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat", - connect_timeout=2, - ) - conn.autocommit = autocommit - cur = conn.cursor() - - return (conn, cur) - - -def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor): - cur.close() - conn.close() - - def test_normal_db_access(): - pgcat_start() - conn, cur = connect_db(autocommit=False) + utils.pgcat_start() + conn, cur = utils.connect_db(autocommit=False) cur.execute("SELECT 1") res = cur.fetchall() print(res) - cleanup_conn(conn, cur) + utils.cleanup_conn(conn, cur) def test_admin_db_access(): - conn, cur = connect_db(admin=True) + conn, cur = utils.connect_db(admin=True) cur.execute("SHOW POOLS") res = cur.fetchall() print(res) - cleanup_conn(conn, cur) + utils.cleanup_conn(conn, cur) def test_shutdown_logic(): @@ -86,17 +32,17 @@ def test_shutdown_logic(): # NO ACTIVE QUERIES SIGINT HANDLING # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and send query (not in transaction) - conn, cur = connect_db() + conn, cur = utils.connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") cur.execute("COMMIT;") # Send sigint to pgcat - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) time.sleep(1) # Check that any new queries fail after sigint since server should close with no active transactions @@ -108,18 +54,18 @@ def test_shutdown_logic(): # Fail if query execution succeeded raise Exception("Server not closed after sigint") - cleanup_conn(conn, cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(conn, cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - conn, cur = connect_db() - admin_conn, admin_cur = connect_db(admin=True) + conn, cur = utils.connect_db() + admin_conn, admin_cur = utils.connect_db(admin=True) cur.execute("BEGIN;") cur.execute("SELECT 1;") @@ -138,24 +84,24 @@ def test_shutdown_logic(): # Fail if query execution succeeded raise Exception("Server not closed after sigint") - cleanup_conn(conn, cur) - cleanup_conn(admin_conn, admin_cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(conn, cur) + utils.cleanup_conn(admin_conn, admin_cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # HANDLE TRANSACTION WITH SIGINT # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - conn, cur = connect_db() + conn, cur = utils.connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") # Send sigint to pgcat while still in transaction - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) time.sleep(1) # Check that any new queries succeed after sigint since server should still allow transaction to complete @@ -165,18 +111,18 @@ def test_shutdown_logic(): # Fail if query fails since server closed raise Exception("Server closed while in transaction", e.pgerror) - cleanup_conn(conn, cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(conn, cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - conn, cur = connect_db() - admin_conn, admin_cur = connect_db(admin=True) + conn, cur = utils.connect_db() + admin_conn, admin_cur = utils.connect_db(admin=True) cur.execute("BEGIN;") cur.execute("SELECT 1;") @@ -194,30 +140,30 @@ def test_shutdown_logic(): # Fail if query fails since server closed raise Exception("Server closed while in transaction", e.pgerror) - cleanup_conn(conn, cur) - cleanup_conn(admin_conn, admin_cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(conn, cur) + utils.cleanup_conn(admin_conn, admin_cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - transaction_conn, transaction_cur = connect_db() + transaction_conn, transaction_cur = utils.connect_db() transaction_cur.execute("BEGIN;") transaction_cur.execute("SELECT 1;") # Send sigint to pgcat while still in transaction - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) time.sleep(1) start = time.perf_counter() try: - conn, cur = connect_db() + conn, cur = utils.connect_db() cur.execute("SELECT 1;") - cleanup_conn(conn, cur) + utils.cleanup_conn(conn, cur) except psycopg2.OperationalError as e: time_taken = time.perf_counter() - start if time_taken > 0.1: @@ -227,49 +173,49 @@ def test_shutdown_logic(): else: raise Exception("Able connect to database during shutdown") - cleanup_conn(transaction_conn, transaction_cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(transaction_conn, transaction_cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - transaction_conn, transaction_cur = connect_db() + transaction_conn, transaction_cur = utils.connect_db() transaction_cur.execute("BEGIN;") transaction_cur.execute("SELECT 1;") # Send sigint to pgcat while still in transaction - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) time.sleep(1) try: - conn, cur = connect_db(admin=True) + conn, cur = utils.connect_db(admin=True) cur.execute("SHOW DATABASES;") - cleanup_conn(conn, cur) + utils.cleanup_conn(conn, cur) except psycopg2.OperationalError as e: raise Exception(e) - cleanup_conn(transaction_conn, transaction_cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(transaction_conn, transaction_cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction - transaction_conn, transaction_cur = connect_db() + transaction_conn, transaction_cur = utils.connect_db() transaction_cur.execute("BEGIN;") transaction_cur.execute("SELECT 1;") - admin_conn, admin_cur = connect_db(admin=True) + admin_conn, admin_cur = utils.connect_db(admin=True) admin_cur.execute("SHOW DATABASES;") # Send sigint to pgcat while still in transaction - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) time.sleep(1) try: @@ -277,24 +223,24 @@ def test_shutdown_logic(): except psycopg2.OperationalError as e: raise Exception("Could not execute admin command:", e) - cleanup_conn(transaction_conn, transaction_cur) - cleanup_conn(admin_conn, admin_cur) - pg_cat_send_signal(signal.SIGTERM) + utils.cleanup_conn(transaction_conn, transaction_cur) + utils.cleanup_conn(admin_conn, admin_cur) + utils.pg_cat_send_signal(signal.SIGTERM) # - - - - - - - - - - - - - - - - - - # HANDLE SHUTDOWN TIMEOUT WITH SIGINT # Start pgcat - pgcat_start() + utils.pgcat_start() # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached - conn, cur = connect_db() + conn, cur = utils.connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") # Send sigint to pgcat while still in transaction - pg_cat_send_signal(signal.SIGINT) + utils.pg_cat_send_signal(signal.SIGINT) # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds time.sleep(SHUTDOWN_TIMEOUT + 1) @@ -308,12 +254,5 @@ def test_shutdown_logic(): # Fail if query execution succeeded raise Exception("Server not closed after sigint and expected timeout") - cleanup_conn(conn, cur) - pg_cat_send_signal(signal.SIGTERM) - - # - - - - - - - - - - - - - - - - - - - - -test_normal_db_access() -test_admin_db_access() -test_shutdown_logic() + utils.cleanup_conn(conn, cur) + utils.pg_cat_send_signal(signal.SIGTERM) diff --git a/tests/python/utils.py b/tests/python/utils.py new file mode 100644 index 0000000..5c49bce --- /dev/null +++ b/tests/python/utils.py @@ -0,0 +1,60 @@ +from typing import Tuple +import os +import psutil +import signal +import time + +import psycopg2 + +PGCAT_HOST = "127.0.0.1" +PGCAT_PORT = "6432" + +def pgcat_start(): + pg_cat_send_signal(signal.SIGTERM) + os.system("./target/debug/pgcat .circleci/pgcat.toml &") + time.sleep(2) + + +def pg_cat_send_signal(signal: signal.Signals): + try: + for proc in psutil.process_iter(["pid", "name"]): + if "pgcat" == proc.name(): + os.kill(proc.pid, signal) + except Exception as e: + # The process can be gone when we send this signal + print(e) + + if signal == signal.SIGTERM: + # Returns 0 if pgcat process exists + time.sleep(2) + if not os.system('pgrep pgcat'): + raise Exception("pgcat not closed after SIGTERM") + + +def connect_db( + autocommit: bool = True, + admin: bool = False, +) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]: + + if admin: + user = "admin_user" + password = "admin_pass" + db = "pgcat" + else: + user = "sharding_user" + password = "sharding_user" + db = "sharded_db" + + conn = psycopg2.connect( + f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat", + connect_timeout=2, + ) + conn.autocommit = autocommit + cur = conn.cursor() + + return (conn, cur) + + +def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor): + cur.close() + conn.close()