mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
2 Commits
circleci_A
...
mostafa_ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a6e674fa89 | ||
|
|
7ee85912f3 |
@@ -59,7 +59,6 @@ admin_password = "admin_pass"
|
|||||||
# session: one server connection per connected client
|
# session: one server connection per connected client
|
||||||
# transaction: one server connection per client transaction
|
# transaction: one server connection per client transaction
|
||||||
pool_mode = "transaction"
|
pool_mode = "transaction"
|
||||||
prepared_statements_cache_size = 500
|
|
||||||
|
|
||||||
# If the client doesn't specify, route traffic to
|
# If the client doesn't specify, route traffic to
|
||||||
# this role by default.
|
# this role by default.
|
||||||
@@ -142,7 +141,6 @@ query_parser_enabled = true
|
|||||||
query_parser_read_write_splitting = true
|
query_parser_read_write_splitting = true
|
||||||
primary_reads_enabled = true
|
primary_reads_enabled = true
|
||||||
sharding_function = "pg_bigint_hash"
|
sharding_function = "pg_bigint_hash"
|
||||||
prepared_statements_cache_size = 500
|
|
||||||
|
|
||||||
[pools.simple_db.users.0]
|
[pools.simple_db.users.0]
|
||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
pytest || exit 1
|
python3 tests/python/tests.py || exit 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
|
|||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -10,5 +10,4 @@ lcov.info
|
|||||||
dev/.bash_history
|
dev/.bash_history
|
||||||
dev/cache
|
dev/cache
|
||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
**/__pycache__
|
|
||||||
@@ -6,32 +6,6 @@ Thank you for contributing! Just a few tips here:
|
|||||||
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
2. Run the test suite (e.g. `pgbench`) to make sure everything still works. The tests are in `.circleci/run_tests.sh`.
|
||||||
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
3. Performance is important, make sure there are no regressions in your branch vs. `main`.
|
||||||
|
|
||||||
## How to run the integration tests locally and iterate on them
|
|
||||||
We have integration tests written in Ruby, Python, Go and Rust.
|
|
||||||
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
|
|
||||||
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
|
|
||||||
|
|
||||||
|
|
||||||
Quite simply, make sure you have docker installed and then run
|
|
||||||
`./start_test_env.sh`
|
|
||||||
|
|
||||||
That is it!
|
|
||||||
|
|
||||||
Within this test environment you can modify the file in your favorite IDE and rerun the tests without having to bootstrap the entire environment again.
|
|
||||||
|
|
||||||
Once the environment is ready, you can run the tests by running
|
|
||||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
|
||||||
Python: `cd /app/ && pytest`
|
|
||||||
Rust: `cd /app/tests/rust && cargo run`
|
|
||||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
|
||||||
|
|
||||||
You can also rebuild PgCat directly within the environment and the tests will run against the newly built binary
|
|
||||||
To rebuild PgCat, just run `cargo build` within the container under `/app`
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Happy hacking!
|
Happy hacking!
|
||||||
|
|
||||||
## TODOs
|
## TODOs
|
||||||
|
|||||||
@@ -1,34 +0,0 @@
|
|||||||
GREEN="\033[0;32m"
|
|
||||||
RED="\033[0;31m"
|
|
||||||
BLUE="\033[0;34m"
|
|
||||||
RESET="\033[0m"
|
|
||||||
|
|
||||||
|
|
||||||
cd tests/docker/
|
|
||||||
docker compose kill main || true
|
|
||||||
docker compose build main
|
|
||||||
docker compose down
|
|
||||||
docker compose up -d
|
|
||||||
# wait for the container to start
|
|
||||||
while ! docker compose exec main ls; do
|
|
||||||
echo "Waiting for test environment to start"
|
|
||||||
sleep 1
|
|
||||||
done
|
|
||||||
echo "==================================="
|
|
||||||
docker compose exec -e LOG_LEVEL=error -d main toxiproxy-server
|
|
||||||
docker compose exec --workdir /app main cargo build
|
|
||||||
docker compose exec -d --workdir /app main ./target/debug/pgcat ./.circleci/pgcat.toml
|
|
||||||
docker compose exec --workdir /app/tests/ruby main bundle install
|
|
||||||
docker compose exec --workdir /app/tests/python main pip3 install -r requirements.txt
|
|
||||||
echo "Interactive test environment ready"
|
|
||||||
echo "To run integration tests, you can use the following commands:"
|
|
||||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
|
||||||
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
|
||||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
|
||||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
|
||||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
|
||||||
echo "You can rebuild PgCat from within the container by running"
|
|
||||||
echo -e " ${GREEN}cargo build${RESET}"
|
|
||||||
echo "and then run the tests again"
|
|
||||||
echo "==================================="
|
|
||||||
docker compose exec --workdir /app/tests main bash
|
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
version: "3"
|
||||||
services:
|
services:
|
||||||
pg1:
|
pg1:
|
||||||
image: postgres:14
|
image: postgres:14
|
||||||
@@ -47,8 +48,6 @@ services:
|
|||||||
main:
|
main:
|
||||||
build: .
|
build: .
|
||||||
command: ["bash", "/app/tests/docker/run.sh"]
|
command: ["bash", "/app/tests/docker/run.sh"]
|
||||||
environment:
|
|
||||||
- INTERACTIVE_TEST_ENVIRONMENT=true
|
|
||||||
volumes:
|
volumes:
|
||||||
- ../../:/app/
|
- ../../:/app/
|
||||||
- /app/target/
|
- /app/target/
|
||||||
|
|||||||
@@ -5,38 +5,6 @@ rm /app/*.profraw || true
|
|||||||
rm /app/pgcat.profdata || true
|
rm /app/pgcat.profdata || true
|
||||||
rm -rf /app/cov || true
|
rm -rf /app/cov || true
|
||||||
|
|
||||||
# Prepares the interactive test environment
|
|
||||||
#
|
|
||||||
if [ -n "$INTERACTIVE_TEST_ENVIRONMENT" ]; then
|
|
||||||
ports=(5432 7432 8432 9432 10432)
|
|
||||||
for port in "${ports[@]}"; do
|
|
||||||
is_it_up=0
|
|
||||||
attempts=0
|
|
||||||
while [ $is_it_up -eq 0 ]; do
|
|
||||||
PGPASSWORD=postgres psql -h 127.0.0.1 -p $port -U postgres -c '\q' > /dev/null 2>&1
|
|
||||||
if [ $? -eq 0 ]; then
|
|
||||||
echo "PostgreSQL on port $port is up."
|
|
||||||
is_it_up=1
|
|
||||||
else
|
|
||||||
attempts=$((attempts+1))
|
|
||||||
if [ $attempts -gt 10 ]; then
|
|
||||||
echo "PostgreSQL on port $port is down, giving up."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "PostgreSQL on port $port is down, waiting for it to start."
|
|
||||||
sleep 1
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
done
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 10432 -U postgres -f /app/tests/sharding/query_routing_setup.sql
|
|
||||||
sleep 100000000000000000
|
|
||||||
exit 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
export LLVM_PROFILE_FILE="/app/pgcat-%m-%p.profraw"
|
||||||
export RUSTC_BOOTSTRAP=1
|
export RUSTC_BOOTSTRAP=1
|
||||||
export CARGO_INCREMENTAL=0
|
export CARGO_INCREMENTAL=0
|
||||||
|
|||||||
@@ -1,3 +1,2 @@
|
|||||||
pytest
|
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
psutil==5.9.1
|
||||||
@@ -1,29 +1,83 @@
|
|||||||
|
from typing import Tuple
|
||||||
|
import psycopg2
|
||||||
|
import psutil
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import psycopg2
|
|
||||||
|
|
||||||
import utils
|
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep pgcat'):
|
||||||
|
raise Exception("pgcat not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def connect_db(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
password = "admin_pass"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def test_normal_db_access():
|
def test_normal_db_access():
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
conn, cur = utils.connect_db(autocommit=False)
|
conn, cur = connect_db(autocommit=False)
|
||||||
cur.execute("SELECT 1")
|
cur.execute("SELECT 1")
|
||||||
res = cur.fetchall()
|
res = cur.fetchall()
|
||||||
print(res)
|
print(res)
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_db_access():
|
def test_admin_db_access():
|
||||||
conn, cur = utils.connect_db(admin=True)
|
conn, cur = connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
cur.execute("SHOW POOLS")
|
||||||
res = cur.fetchall()
|
res = cur.fetchall()
|
||||||
print(res)
|
print(res)
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
@@ -32,17 +86,17 @@ def test_shutdown_logic():
|
|||||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and send query (not in transaction)
|
# Create client connection and send query (not in transaction)
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cur.execute("COMMIT;")
|
cur.execute("COMMIT;")
|
||||||
|
|
||||||
# Send sigint to pgcat
|
# Send sigint to pgcat
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||||
@@ -54,18 +108,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -84,24 +138,24 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH SIGINT
|
# HANDLE TRANSACTION WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||||
@@ -111,18 +165,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -140,30 +194,30 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
time_taken = time.perf_counter() - start
|
time_taken = time.perf_counter() - start
|
||||||
if time_taken > 0.1:
|
if time_taken > 0.1:
|
||||||
@@ -173,49 +227,49 @@ def test_shutdown_logic():
|
|||||||
else:
|
else:
|
||||||
raise Exception("Able connect to database during shutdown")
|
raise Exception("Able connect to database during shutdown")
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn, cur = utils.connect_db(admin=True)
|
conn, cur = connect_db(admin=True)
|
||||||
cur.execute("SHOW DATABASES;")
|
cur.execute("SHOW DATABASES;")
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception(e)
|
raise Exception(e)
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
admin_cur.execute("SHOW DATABASES;")
|
admin_cur.execute("SHOW DATABASES;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -223,24 +277,24 @@ def test_shutdown_logic():
|
|||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception("Could not execute admin command:", e)
|
raise Exception("Could not execute admin command:", e)
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||||
@@ -254,5 +308,12 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint and expected timeout")
|
raise Exception("Server not closed after sigint and expected timeout")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
# - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
|
test_normal_db_access()
|
||||||
|
test_admin_db_access()
|
||||||
|
test_shutdown_logic()
|
||||||
@@ -1,60 +0,0 @@
|
|||||||
from typing import Tuple
|
|
||||||
import os
|
|
||||||
import psutil
|
|
||||||
import signal
|
|
||||||
import time
|
|
||||||
|
|
||||||
import psycopg2
|
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
|
||||||
PGCAT_PORT = "6432"
|
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if "pgcat" == proc.name():
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep pgcat'):
|
|
||||||
raise Exception("pgcat not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
password = "admin_pass"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
664
tests/rust/Cargo.lock
generated
664
tests/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -15,11 +15,13 @@ async fn test_prepared_statements() {
|
|||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = tokio::task::spawn(async move {
|
||||||
for i in 0..1000 {
|
for _ in 0..1000 {
|
||||||
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
|
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
panic!("prepared statement error: {}", err);
|
if err.to_string().contains("prepared statement") {
|
||||||
|
panic!("prepared statement error: {}", err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user