Compare commits

..

5 Commits

Author SHA1 Message Date
Mostafa
b24e5395be fix lint issues 2024-09-05 08:19:08 -05:00
Mostafa
53b6b56563 Resolve conflicts 2024-09-05 08:15:18 -05:00
Curtis Myzie
1cc71f3990 Remove lint 2024-05-17 19:19:00 +06:00
Curtis Myzie
baa6f661aa Remove lint 2024-05-17 19:18:38 +06:00
Curtis Myzie
ae2b83d239 Prometheus metrics updates:
* Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.
2024-05-17 19:12:37 +06:00
17 changed files with 274 additions and 481 deletions

View File

@@ -107,7 +107,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
pytest || exit 1
python3 tests/python/tests.py || exit 1
#

View File

@@ -22,7 +22,7 @@ jobs:
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python
uses: actions/setup-python@v5.1.0
uses: actions/setup-python@v4.1.0
with:
python-version: 3.7
@@ -43,7 +43,7 @@ jobs:
run: ct lint --config ct.yaml
- name: Create kind cluster
uses: helm/kind-action@v1.10.0
uses: helm/kind-action@v1.7.0
if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install)

View File

@@ -1,9 +1,6 @@
name: pgcat package (deb)
on:
push:
tags:
- v*
workflow_dispatch:
inputs:
packageVersion:
@@ -19,14 +16,6 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Set package version
if: github.event_name == 'push' # For push event
run: |
TAG=${{ github.ref_name }}
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
- name: Set package version (manual dispatch)
if: github.event_name == 'workflow_dispatch' # For manual dispatch
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
@@ -50,10 +39,10 @@ jobs:
export ARCH=arm64
fi
bash utilities/deb.sh ${{ env.packageVersion }}
bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \
--lock \
--bucket apt.postgresml.org \
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs)

1
.gitignore vendored
View File

@@ -11,4 +11,3 @@ dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
**/__pycache__

View File

@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
Once the environment is ready, you can run the tests by running
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
Python: `cd /app/ && pytest`
Python: `cd /app && python3 tests/python/tests.py`
Rust: `cd /app/tests/rust && cargo run`
Go: `cd /app/tests/go && /usr/local/go/bin/go test`

4
Cargo.lock generated
View File

@@ -1526,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.51.0"
version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [
"log",
"sqlparser_derive",

View File

@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = { version = "0.51", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"

View File

@@ -7,7 +7,3 @@ systemctl enable pgcat
if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat
fi
if [ -f /etc/pgcat.toml ]; then
systemctl start pgcat
fi

View File

@@ -1,4 +1,3 @@
use crate::config::AuthType;
use crate::errors::Error;
use crate::pool::ConnectionPool;
use crate::server::Server;
@@ -72,7 +71,6 @@ impl AuthPassthrough {
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
let auth_user = crate::config::User {
username: self.user.clone(),
auth_type: AuthType::MD5,
password: Some(self.password.clone()),
server_username: None,
server_password: None,

View File

@@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
};
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*;
use crate::messages::*;
use crate::plugins::PluginOutput;
@@ -465,8 +463,8 @@ where
.count()
== 1;
// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only {
// Kick any client that's not admin while we're in admin-only mode.
debug!(
"Rejecting non-admin connection to {} when in admin only mode",
pool_name
@@ -483,76 +481,72 @@ where
let process_id: i32 = rand::random();
let secret_key: i32 = rand::random();
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let mut prepared_statements_enabled = false;
// Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin {
let config = get_config();
// TODO: Add SASL support.
// Perform MD5 authentication.
match config.general.admin_auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
if password_hash != password_response {
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
warn!("{}", error);
wrong_password(&mut write, username).await?;
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
if password_hash != password_response {
let error =
Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(error);
}
}
return Err(error);
}
(false, generate_server_parameters_for_admin())
}
// Authenticate normal user.
@@ -579,143 +573,92 @@ where
// Obtain the hash to compare, we give preference to that written in cleartext in config
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
// when the pool was created. If there is no hash there, we try to fetch it one more time.
match pool.settings.user.auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
let mut hash = (*pool.auth_hash.read()).clone();
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let mut hash = (*pool.auth_hash.read()).clone();
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!(
"Password for {}, obtained. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone());
}
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!("Password for {}, obtained. Updating.", client_identifier);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
*pool_auth_hash = Some(fetched_hash.clone());
}
} else {
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
}
} else {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
client_identifier,
));
}
}
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
prepared_statements_enabled =
transaction_mode && pool.prepared_statement_cache.is_some();

View File

@@ -208,9 +208,6 @@ impl Address {
pub struct User {
pub username: String,
pub password: Option<String>,
#[serde(default = "User::default_auth_type")]
pub auth_type: AuthType,
pub server_username: Option<String>,
pub server_password: Option<String>,
pub pool_size: u32,
@@ -228,7 +225,6 @@ impl Default for User {
User {
username: String::from("postgres"),
password: None,
auth_type: AuthType::MD5,
server_username: None,
server_password: None,
pool_size: 15,
@@ -243,10 +239,6 @@ impl Default for User {
}
impl User {
pub fn default_auth_type() -> AuthType {
AuthType::MD5
}
fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size {
@@ -342,9 +334,6 @@ pub struct General {
pub admin_username: String,
pub admin_password: String,
#[serde(default = "General::default_admin_auth_type")]
pub admin_auth_type: AuthType,
#[serde(default = "General::default_validate_config")]
pub validate_config: bool,
@@ -359,10 +348,6 @@ impl General {
"0.0.0.0".into()
}
pub fn default_admin_auth_type() -> AuthType {
AuthType::MD5
}
pub fn default_port() -> u16 {
5432
}
@@ -471,7 +456,6 @@ impl Default for General {
verify_server_certificate: false,
admin_username: String::from("admin"),
admin_password: String::from("admin"),
admin_auth_type: AuthType::MD5,
validate_config: true,
auth_query: None,
auth_query_user: None,
@@ -492,15 +476,6 @@ pub enum PoolMode {
Session,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum AuthType {
#[serde(alias = "trust", alias = "Trust")]
Trust,
#[serde(alias = "md5", alias = "MD5")]
MD5,
}
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

View File

@@ -23,7 +23,7 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
echo "Interactive test environment ready"
echo "To run integration tests, you can use the following commands:"
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"

View File

@@ -1,3 +1,2 @@
pytest
psycopg2==2.9.3
psutil==5.9.1

View File

@@ -1,71 +0,0 @@
import utils
import signal
class TestTrustAuth:
@classmethod
def setup_method(cls):
config= """
[general]
host = "0.0.0.0"
port = 6432
admin_username = "admin_user"
admin_password = ""
admin_auth_type = "trust"
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
auth_type = "trust"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.sharded_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
]
database = "shard0"
"""
utils.pgcat_generic_start(config)
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_admin_trust_auth(self):
conn, cur = utils.connect_db_trust(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_normal_trust_auth(self):
conn, cur = utils.connect_db_trust(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
class TestMD5Auth:
@classmethod
def setup_method(cls):
utils.pgcat_start()
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_normal_db_access(self):
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access(self):
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)

View File

@@ -1,12 +1,84 @@
from typing import Tuple
import psycopg2
import psutil
import os
import signal
import time
import psycopg2
import utils
SHUTDOWN_TIMEOUT = 5
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def pgcat_start():
pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
cleanup_conn(conn, cur)
def test_shutdown_logic():
@@ -14,17 +86,17 @@ def test_shutdown_logic():
# NO ACTIVE QUERIES SIGINT HANDLING
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and send query (not in transaction)
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
cur.execute("COMMIT;")
# Send sigint to pgcat
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries fail after sigint since server should close with no active transactions
@@ -36,18 +108,18 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -66,24 +138,24 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint")
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
# Check that any new queries succeed after sigint since server should still allow transaction to complete
@@ -93,18 +165,18 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
conn, cur = utils.connect_db()
admin_conn, admin_cur = utils.connect_db(admin=True)
conn, cur = connect_db()
admin_conn, admin_cur = connect_db(admin=True)
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
@@ -122,30 +194,30 @@ def test_shutdown_logic():
# Fail if query fails since server closed
raise Exception("Server closed while in transaction", e.pgerror)
utils.cleanup_conn(conn, cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
start = time.perf_counter()
try:
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("SELECT 1;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
time_taken = time.perf_counter() - start
if time_taken > 0.1:
@@ -155,49 +227,49 @@ def test_shutdown_logic():
else:
raise Exception("Able connect to database during shutdown")
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
conn, cur = utils.connect_db(admin=True)
conn, cur = connect_db(admin=True)
cur.execute("SHOW DATABASES;")
utils.cleanup_conn(conn, cur)
cleanup_conn(conn, cur)
except psycopg2.OperationalError as e:
raise Exception(e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction
transaction_conn, transaction_cur = utils.connect_db()
transaction_conn, transaction_cur = connect_db()
transaction_cur.execute("BEGIN;")
transaction_cur.execute("SELECT 1;")
admin_conn, admin_cur = utils.connect_db(admin=True)
admin_conn, admin_cur = connect_db(admin=True)
admin_cur.execute("SHOW DATABASES;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
time.sleep(1)
try:
@@ -205,24 +277,24 @@ def test_shutdown_logic():
except psycopg2.OperationalError as e:
raise Exception("Could not execute admin command:", e)
utils.cleanup_conn(transaction_conn, transaction_cur)
utils.cleanup_conn(admin_conn, admin_cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(transaction_conn, transaction_cur)
cleanup_conn(admin_conn, admin_cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
# Start pgcat
utils.pgcat_start()
pgcat_start()
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
conn, cur = utils.connect_db()
conn, cur = connect_db()
cur.execute("BEGIN;")
cur.execute("SELECT 1;")
# Send sigint to pgcat while still in transaction
utils.pg_cat_send_signal(signal.SIGINT)
pg_cat_send_signal(signal.SIGINT)
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
time.sleep(SHUTDOWN_TIMEOUT + 1)
@@ -236,7 +308,12 @@ def test_shutdown_logic():
# Fail if query execution succeeded
raise Exception("Server not closed after sigint and expected timeout")
utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM)
cleanup_conn(conn, cur)
pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -
test_normal_db_access()
test_admin_db_access()
test_shutdown_logic()

View File

@@ -1,110 +0,0 @@
import os
import signal
import time
from typing import Tuple
import tempfile
import psutil
import psycopg2
PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432"
def _pgcat_start(config_path: str):
pg_cat_send_signal(signal.SIGTERM)
os.system(f"./target/debug/pgcat {config_path} &")
time.sleep(2)
def pgcat_start():
_pgcat_start(config_path='.circleci/pgcat.toml')
def pgcat_generic_start(config: str):
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'w') as f:
f.write(config)
_pgcat_start(config_path=tmp.name)
def glauth_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.name() == "glauth":
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep glauth'):
raise Exception("glauth not closed after SIGTERM")
def pg_cat_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep pgcat'):
raise Exception("pgcat not closed after SIGTERM")
def connect_db(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
password = "admin_pass"
db = "pgcat"
else:
user = "sharding_user"
password = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def connect_db_trust(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
db = "pgcat"
else:
user = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close()
conn.close()

View File

@@ -24,8 +24,7 @@ GEM
pg (1.3.2)
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.3.6)
strscan
rexml (3.2.5)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
@@ -51,7 +50,6 @@ GEM
rubocop-ast (1.17.0)
parser (>= 3.1.1.0)
ruby-progressbar (1.11.0)
strscan (3.1.0)
toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1)