Compare commits

..

6 Commits

Author SHA1 Message Date
Mostafa
b24e5395be fix lint issues 2024-09-05 08:19:08 -05:00
Mostafa
53b6b56563 Resolve conflicts 2024-09-05 08:15:18 -05:00
Andrew Jackson
f73d15f82c Fix CI script to allow consecutive runs locally (#793)
Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
2024-09-05 08:01:33 -05:00
Curtis Myzie
1cc71f3990 Remove lint 2024-05-17 19:19:00 +06:00
Curtis Myzie
baa6f661aa Remove lint 2024-05-17 19:18:38 +06:00
Curtis Myzie
ae2b83d239 Prometheus metrics updates:
* Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.
2024-05-17 19:12:37 +06:00
9 changed files with 198 additions and 140 deletions

View File

@@ -26,6 +26,7 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server &
sleep 1
@@ -106,7 +107,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
pytest || exit 1
python3 tests/python/tests.py || exit 1
#
@@ -177,3 +178,6 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown
sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

1
.gitignore vendored
View File

@@ -11,4 +11,3 @@ dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
**/__pycache__

View File

@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`

View File

@@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self
.labels
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
.iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
"{name}{{{formatted_labels}}} {value}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels,
value = self.value
)
@@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels)
}
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
}
async fn prometheus_stats(
@@ -313,6 +322,7 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -322,7 +332,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -330,33 +343,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone();
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);
let metrics = vec![
("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections),
@@ -365,7 +398,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -373,6 +409,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
// Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -405,7 +449,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
@@ -428,7 +472,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
@@ -437,6 +484,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}
pub async fn start_metric_server(http_addr: SocketAddr) {

View File

@@ -23,7 +23,7 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"

View File

@@ -1,3 +1,2 @@
pytest
psycopg2==2.9.3
psutil==5.9.1

View File

@@ -1,29 +1,83 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal
import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
utils.pgcat_start()
conn, cur = utils.connect_db(autocommit=False)
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = utils.connect_db(admin=True)
conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
def test_shutdown_logic():
@@ -32,17 +86,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and send query (not in transaction)
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
cur.execute("COMMIT;")
# Send sigint to pgcat
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions
@@ -54,18 +108,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -84,24 +138,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -111,18 +165,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -140,30 +194,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
start = time.perf_counter()
try:
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("SELECT 1;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
@@ -173,49 +227,49 @@ def test_shutdown_logic():
else:
raise Exception("Able connect to database during shutdown")
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
conn, cur = utils.connect_db(admin=True)
conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = utils.connect_db(admin=True)
admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
@@ -223,24 +277,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -254,5 +308,12 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

View File

@@ -1,60 +0,0 @@
from typing import Tuple
import os
import psutil
import signal
import time
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()