Compare commits

..

6 Commits

Author SHA1 Message Date
Mostafa
b24e5395be fix lint issues 2024-09-05 08:19:08 -05:00
Mostafa
53b6b56563 Resolve conflicts 2024-09-05 08:15:18 -05:00
Andrew Jackson
f73d15f82c Fix CI script to allow consecutive runs locally (#793)
Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
2024-09-05 08:01:33 -05:00
Curtis Myzie
1cc71f3990 Remove lint 2024-05-17 19:19:00 +06:00
Curtis Myzie
baa6f661aa Remove lint 2024-05-17 19:18:38 +06:00
Curtis Myzie
ae2b83d239 Prometheus metrics updates:
* Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.
2024-05-17 19:12:37 +06:00
9 changed files with 198 additions and 140 deletions

View File

@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy # Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server & LOG_LEVEL=error toxiproxy-server &
sleep 1 sleep 1
@@ -106,7 +107,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests
# #
pip3 install -r tests/python/requirements.txt pip3 install -r tests/python/requirements.txt
pytest || exit 1 python3 tests/python/tests.py || exit 1
# #
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown # Allow for graceful shutdown
sleep 1 sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

3
.gitignore vendored
View File

@@ -10,5 +10,4 @@ lcov.info
dev/.bash_history dev/.bash_history
dev/cache dev/cache
!dev/cache/.keepme !dev/cache/.keepme
.venv .venv
**/__pycache__

View File

@@ -7,7 +7,7 @@ Thank you for contributing! Just a few tips here:
3. Performance is important, make sure there are no regressions in your branch vs. `main`. 3. Performance is important, make sure there are no regressions in your branch vs. `main`.
## How to run the integration tests locally and iterate on them ## How to run the integration tests locally and iterate on them
We have integration tests written in Ruby, Python, Go and Rust. We have integration tests written in Ruby, Python, Go and Rust.
Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround. Below are the steps to run them in a developer-friendly way that allows iterating and quick turnaround.
Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell. Hear me out, this should be easy, it will involve opening a shell into a container with all the necessary dependancies available for you and you can modify the test code and immediately rerun your test in the interactive shell.
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation` Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest` Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run` Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test` Go: `cd /app/tests/go && /usr/local/go/bin/go test`

View File

@@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> { impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self let mut sorted_labels: Vec<_> = self.labels.iter().collect();
.labels sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
.iter() .iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value)) .map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
write!( write!(
f, f,
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n", "{name}{{{formatted_labels}}} {value}",
name = format_args!("pgcat_{}", self.name), name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels, formatted_labels = formatted_labels,
value = self.value value = self.value
) )
@@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels) Self::from_name(&format!("databases_{}", name), value, labels)
} }
@@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone()); labels.insert("username", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
@@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels) Self::from_name(&format!("pools_{}", name), value, labels)
} }
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
} }
async fn prometheus_stats( async fn prometheus_stats(
@@ -313,6 +322,7 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command. // Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) { fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -322,7 +332,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value) PrometheusMetric::<u64>::from_address(address, &key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -330,33 +343,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW POOLS admin command. // Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) { fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup(); let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() { for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() { for (name, value) in stats.clone() {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value) PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for ({})", name, *pool_id); debug!("Metric {} not implemented for ({})", name, *pool_id);
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW DATABASES admin command. // Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) { fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone(); let pool_config = pool.settings.clone();
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server); let pool_state = pool.pool_state(shard, server);
let metrics = vec![ let metrics = vec![
("pool_size", pool_config.user.pool_size), ("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections), ("current_connections", pool_state.connections),
@@ -365,7 +398,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value) PrometheusMetric::<u32>::from_database_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -373,6 +409,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -405,7 +449,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1, crate::stats::ServerState::Idle => entry.idle_count += 1,
} }
} }
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -428,7 +472,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value) PrometheusMetric::<u64>::from_server_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -437,6 +484,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
pub async fn start_metric_server(http_addr: SocketAddr) { pub async fn start_metric_server(http_addr: SocketAddr) {

View File

@@ -23,11 +23,11 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready" echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:" echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}" echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}" echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}" echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}" echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again" echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
echo "You can rebuild PgCat from within the container by running" echo "You can rebuild PgCat from within the container by running"
echo -e " ${GREEN}cargo build${RESET}" echo -e " ${GREEN}cargo build${RESET}"
echo "and then run the tests again" echo "and then run the tests again"
echo "===================================" echo "==================================="

View File

@@ -1,3 +1,2 @@
pytest
psycopg2==2.9.3 psycopg2==2.9.3
psutil==5.9.1 psutil==5.9.1

View File

@@ -1,29 +1,83 @@
from typing import Tuple
import psycopg2
import psutil
import os import os
import signal import signal
import time import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5 SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access(): def test_normal_db_access():
utils.pgcat_start() pgcat_start()
conn, cur = utils.connect_db(autocommit=False) conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1") cur.execute("SELECT 1")
res = cur.fetchall() res = cur.fetchall()
print(res) print(res)
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
def test_admin_db_access(): def test_admin_db_access():
conn, cur = utils.connect_db(admin=True) conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS") cur.execute("SHOW POOLS")
res = cur.fetchall() res = cur.fetchall()
print(res) print(res)
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
def test_shutdown_logic(): def test_shutdown_logic():
@@ -32,17 +86,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING # NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and send query (not in transaction) # Create client connection and send query (not in transaction)
conn, cur = utils.connect_db() conn, cur = connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
cur.execute("COMMIT;") cur.execute("COMMIT;")
# Send sigint to pgcat # Send sigint to pgcat
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions # Check that any new queries fail after sigint since server should close with no active transactions
@@ -54,18 +108,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint") raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND # NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = utils.connect_db() conn, cur = connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True) admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
@@ -84,24 +138,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint") raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur) cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT # HANDLE TRANSACTION WITH SIGINT
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = utils.connect_db() conn, cur = connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete # Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -111,18 +165,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed # Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror) raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND # HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
conn, cur = utils.connect_db() conn, cur = connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True) admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
@@ -140,30 +194,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed # Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror) raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur) cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db() transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
start = time.perf_counter() start = time.perf_counter()
try: try:
conn, cur = utils.connect_db() conn, cur = connect_db()
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start time_taken = time.perf_counter() - start
if time_taken > 0.1: if time_taken > 0.1:
@@ -173,49 +227,49 @@ def test_shutdown_logic():
else: else:
raise Exception("Able connect to database during shutdown") raise Exception("Able connect to database during shutdown")
utils.cleanup_conn(transaction_conn, transaction_cur) cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db() transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
try: try:
conn, cur = utils.connect_db(admin=True) conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;") cur.execute("SHOW DATABASES;")
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
raise Exception(e) raise Exception(e)
utils.cleanup_conn(transaction_conn, transaction_cur) cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction # Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db() transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;") transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;") transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = utils.connect_db(admin=True) admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;") admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
time.sleep(1) time.sleep(1)
try: try:
@@ -223,24 +277,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e: except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e) raise Exception("Could not execute admin command:", e)
utils.cleanup_conn(transaction_conn, transaction_cur) cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur) cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT # HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat # Start pgcat
utils.pgcat_start() pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = utils.connect_db() conn, cur = connect_db()
cur.execute("BEGIN;") cur.execute("BEGIN;")
cur.execute("SELECT 1;") cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction # Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT) pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1) time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -254,5 +308,12 @@ def test_shutdown_logic():
# Fail if query execution succeeded # Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout") raise Exception("Server not closed after sigint and expected timeout")
utils.cleanup_conn(conn, cur) cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM) pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

View File

@@ -1,60 +0,0 @@
from typing import Tuple
import os
import psutil
import signal
import time
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()