mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Implement Trust Authentication (#805)
* Implement Trust Authentication * Remove remaining LDAP stuff * Reverted LDAP changes, Cleaned up tests --------- Co-authored-by: Andrew Jackson <andrewjackson2988@gmail.com> Co-authored-by: CommanderKeynes <andrewjackson947@gmail.coma>
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::config::AuthType;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::ConnectionPool;
|
use crate::pool::ConnectionPool;
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -71,6 +72,7 @@ impl AuthPassthrough {
|
|||||||
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
||||||
let auth_user = crate::config::User {
|
let auth_user = crate::config::User {
|
||||||
username: self.user.clone(),
|
username: self.user.clone(),
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
password: Some(self.password.clone()),
|
password: Some(self.password.clone()),
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
|
|||||||
317
src/client.rs
317
src/client.rs
@@ -14,7 +14,9 @@ use tokio::sync::mpsc::Sender;
|
|||||||
|
|
||||||
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
||||||
use crate::auth_passthrough::refetch_auth_hash;
|
use crate::auth_passthrough::refetch_auth_hash;
|
||||||
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
use crate::config::{
|
||||||
|
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
|
||||||
|
};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
use crate::plugins::PluginOutput;
|
||||||
@@ -463,8 +465,8 @@ where
|
|||||||
.count()
|
.count()
|
||||||
== 1;
|
== 1;
|
||||||
|
|
||||||
// Kick any client that's not admin while we're in admin-only mode.
|
|
||||||
if !admin && admin_only {
|
if !admin && admin_only {
|
||||||
|
// Kick any client that's not admin while we're in admin-only mode.
|
||||||
debug!(
|
debug!(
|
||||||
"Rejecting non-admin connection to {} when in admin only mode",
|
"Rejecting non-admin connection to {} when in admin only mode",
|
||||||
pool_name
|
pool_name
|
||||||
@@ -481,72 +483,76 @@ where
|
|||||||
let process_id: i32 = rand::random();
|
let process_id: i32 = rand::random();
|
||||||
let secret_key: i32 = rand::random();
|
let secret_key: i32 = rand::random();
|
||||||
|
|
||||||
// Perform MD5 authentication.
|
|
||||||
// TODO: Add SASL support.
|
|
||||||
let salt = md5_challenge(&mut write).await?;
|
|
||||||
|
|
||||||
let code = match read.read_u8().await {
|
|
||||||
Ok(p) => p,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password code".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// PasswordMessage
|
|
||||||
if code as char != 'p' {
|
|
||||||
return Err(Error::ProtocolSyncError(format!(
|
|
||||||
"Expected p, got {}",
|
|
||||||
code as char
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let len = match read.read_i32().await {
|
|
||||||
Ok(len) => len,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message length".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
|
||||||
|
|
||||||
match read.read_exact(&mut password_response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut prepared_statements_enabled = false;
|
let mut prepared_statements_enabled = false;
|
||||||
|
|
||||||
// Authenticate admin user.
|
// Authenticate admin user.
|
||||||
let (transaction_mode, mut server_parameters) = if admin {
|
let (transaction_mode, mut server_parameters) = if admin {
|
||||||
let config = get_config();
|
let config = get_config();
|
||||||
|
// TODO: Add SASL support.
|
||||||
|
// Perform MD5 authentication.
|
||||||
|
match config.general.admin_auth_type {
|
||||||
|
AuthType::Trust => (),
|
||||||
|
AuthType::MD5 => {
|
||||||
|
let salt = md5_challenge(&mut write).await?;
|
||||||
|
|
||||||
// Compare server and client hashes.
|
let code = match read.read_u8().await {
|
||||||
let password_hash = md5_hash_password(
|
Ok(p) => p,
|
||||||
&config.general.admin_username,
|
Err(_) => {
|
||||||
&config.general.admin_password,
|
return Err(Error::ClientSocketError(
|
||||||
&salt,
|
"password code".into(),
|
||||||
);
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if password_hash != password_response {
|
// PasswordMessage
|
||||||
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
if code as char != 'p' {
|
||||||
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
|
"Expected p, got {}",
|
||||||
|
code as char
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
warn!("{}", error);
|
let len = match read.read_i32().await {
|
||||||
wrong_password(&mut write, username).await?;
|
Ok(len) => len,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return Err(error);
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Compare server and client hashes.
|
||||||
|
let password_hash = md5_hash_password(
|
||||||
|
&config.general.admin_username,
|
||||||
|
&config.general.admin_password,
|
||||||
|
&salt,
|
||||||
|
);
|
||||||
|
|
||||||
|
if password_hash != password_response {
|
||||||
|
let error =
|
||||||
|
Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||||
|
|
||||||
|
warn!("{}", error);
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(false, generate_server_parameters_for_admin())
|
(false, generate_server_parameters_for_admin())
|
||||||
}
|
}
|
||||||
// Authenticate normal user.
|
// Authenticate normal user.
|
||||||
@@ -573,92 +579,143 @@ where
|
|||||||
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
||||||
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
||||||
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
||||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
match pool.settings.user.auth_type {
|
||||||
Some(md5_hash_password(username, password, &salt))
|
AuthType::Trust => (),
|
||||||
} else {
|
AuthType::MD5 => {
|
||||||
if !get_config().is_auth_query_configured() {
|
// Perform MD5 authentication.
|
||||||
wrong_password(&mut write, username).await?;
|
// TODO: Add SASL support.
|
||||||
return Err(Error::ClientAuthImpossible(username.into()));
|
let salt = md5_challenge(&mut write).await?;
|
||||||
}
|
|
||||||
|
|
||||||
let mut hash = (*pool.auth_hash.read()).clone();
|
let code = match read.read_u8().await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password code".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if hash.is_none() {
|
// PasswordMessage
|
||||||
warn!(
|
if code as char != 'p' {
|
||||||
"Query auth configured \
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
but no hash password found \
|
"Expected p, got {}",
|
||||||
for pool {}. Will try to refetch it.",
|
code as char
|
||||||
pool_name
|
)));
|
||||||
);
|
}
|
||||||
|
|
||||||
match refetch_auth_hash(&pool).await {
|
let len = match read.read_i32().await {
|
||||||
Ok(fetched_hash) => {
|
Ok(len) => len,
|
||||||
warn!("Password for {}, obtained. Updating.", client_identifier);
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||||
|
Some(md5_hash_password(username, password, &salt))
|
||||||
|
} else {
|
||||||
|
if !get_config().is_auth_query_configured() {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
return Err(Error::ClientAuthImpossible(username.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut hash = (*pool.auth_hash.read()).clone();
|
||||||
|
|
||||||
|
if hash.is_none() {
|
||||||
|
warn!(
|
||||||
|
"Query auth configured \
|
||||||
|
but no hash password found \
|
||||||
|
for pool {}. Will try to refetch it.",
|
||||||
|
pool_name
|
||||||
|
);
|
||||||
|
|
||||||
|
match refetch_auth_hash(&pool).await {
|
||||||
|
Ok(fetched_hash) => {
|
||||||
|
warn!(
|
||||||
|
"Password for {}, obtained. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
|
*pool_auth_hash = Some(fetched_hash.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
hash = Some(fetched_hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(Error::ClientAuthPassthroughError(
|
||||||
|
err.to_string(),
|
||||||
|
client_identifier,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
||||||
|
};
|
||||||
|
|
||||||
|
// Once we have the resulting hash, we compare with what the client gave us.
|
||||||
|
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
||||||
|
// to see if the password has changed since the pool was created.
|
||||||
|
//
|
||||||
|
// @TODO: we could end up fetching again the same password twice (see above).
|
||||||
|
if password_hash.unwrap() != password_response {
|
||||||
|
warn!(
|
||||||
|
"Invalid password {}, will try to refetch it.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
let fetched_hash = match refetch_auth_hash(&pool).await {
|
||||||
|
Ok(fetched_hash) => fetched_hash,
|
||||||
|
Err(err) => {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||||
|
|
||||||
|
// Ok password changed in server an auth is possible.
|
||||||
|
if new_password_hash == password_response {
|
||||||
|
warn!(
|
||||||
|
"Password for {}, changed in server. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
*pool_auth_hash = Some(fetched_hash.clone());
|
*pool_auth_hash = Some(fetched_hash);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
hash = Some(fetched_hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
wrong_password(&mut write, username).await?;
|
||||||
|
return Err(Error::ClientGeneralError(
|
||||||
return Err(Error::ClientAuthPassthroughError(
|
"Invalid password".into(),
|
||||||
err.to_string(),
|
|
||||||
client_identifier,
|
client_identifier,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
|
||||||
};
|
|
||||||
|
|
||||||
// Once we have the resulting hash, we compare with what the client gave us.
|
|
||||||
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
|
||||||
// to see if the password has changed since the pool was created.
|
|
||||||
//
|
|
||||||
// @TODO: we could end up fetching again the same password twice (see above).
|
|
||||||
if password_hash.unwrap() != password_response {
|
|
||||||
warn!(
|
|
||||||
"Invalid password {}, will try to refetch it.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
|
||||||
Ok(fetched_hash) => fetched_hash,
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
|
||||||
|
|
||||||
// Ok password changed in server an auth is possible.
|
|
||||||
if new_password_hash == password_response {
|
|
||||||
warn!(
|
|
||||||
"Password for {}, changed in server. Updating.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
|
||||||
*pool_auth_hash = Some(fetched_hash);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
return Err(Error::ClientGeneralError(
|
|
||||||
"Invalid password".into(),
|
|
||||||
client_identifier,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
||||||
prepared_statements_enabled =
|
prepared_statements_enabled =
|
||||||
transaction_mode && pool.prepared_statement_cache.is_some();
|
transaction_mode && pool.prepared_statement_cache.is_some();
|
||||||
|
|||||||
@@ -208,6 +208,9 @@ impl Address {
|
|||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: Option<String>,
|
pub password: Option<String>,
|
||||||
|
|
||||||
|
#[serde(default = "User::default_auth_type")]
|
||||||
|
pub auth_type: AuthType,
|
||||||
pub server_username: Option<String>,
|
pub server_username: Option<String>,
|
||||||
pub server_password: Option<String>,
|
pub server_password: Option<String>,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
@@ -225,6 +228,7 @@ impl Default for User {
|
|||||||
User {
|
User {
|
||||||
username: String::from("postgres"),
|
username: String::from("postgres"),
|
||||||
password: None,
|
password: None,
|
||||||
|
auth_type: AuthType::MD5,
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
@@ -239,6 +243,10 @@ impl Default for User {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
|
pub fn default_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
fn validate(&self) -> Result<(), Error> {
|
fn validate(&self) -> Result<(), Error> {
|
||||||
if let Some(min_pool_size) = self.min_pool_size {
|
if let Some(min_pool_size) = self.min_pool_size {
|
||||||
if min_pool_size > self.pool_size {
|
if min_pool_size > self.pool_size {
|
||||||
@@ -334,6 +342,9 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_admin_auth_type")]
|
||||||
|
pub admin_auth_type: AuthType,
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
#[serde(default = "General::default_validate_config")]
|
||||||
pub validate_config: bool,
|
pub validate_config: bool,
|
||||||
|
|
||||||
@@ -348,6 +359,10 @@ impl General {
|
|||||||
"0.0.0.0".into()
|
"0.0.0.0".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_admin_auth_type() -> AuthType {
|
||||||
|
AuthType::MD5
|
||||||
|
}
|
||||||
|
|
||||||
pub fn default_port() -> u16 {
|
pub fn default_port() -> u16 {
|
||||||
5432
|
5432
|
||||||
}
|
}
|
||||||
@@ -456,6 +471,7 @@ impl Default for General {
|
|||||||
verify_server_certificate: false,
|
verify_server_certificate: false,
|
||||||
admin_username: String::from("admin"),
|
admin_username: String::from("admin"),
|
||||||
admin_password: String::from("admin"),
|
admin_password: String::from("admin"),
|
||||||
|
admin_auth_type: AuthType::MD5,
|
||||||
validate_config: true,
|
validate_config: true,
|
||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
@@ -476,6 +492,15 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
||||||
|
pub enum AuthType {
|
||||||
|
#[serde(alias = "trust", alias = "Trust")]
|
||||||
|
Trust,
|
||||||
|
|
||||||
|
#[serde(alias = "md5", alias = "MD5")]
|
||||||
|
MD5,
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
impl std::fmt::Display for PoolMode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
|||||||
71
tests/python/test_auth.py
Normal file
71
tests/python/test_auth.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import utils
|
||||||
|
import signal
|
||||||
|
|
||||||
|
class TestTrustAuth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
config= """
|
||||||
|
[general]
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = 6432
|
||||||
|
admin_username = "admin_user"
|
||||||
|
admin_password = ""
|
||||||
|
admin_auth_type = "trust"
|
||||||
|
|
||||||
|
[pools.sharded_db.users.0]
|
||||||
|
username = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
auth_type = "trust"
|
||||||
|
pool_size = 10
|
||||||
|
min_pool_size = 1
|
||||||
|
pool_mode = "transaction"
|
||||||
|
|
||||||
|
[pools.sharded_db.shards.0]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
]
|
||||||
|
database = "shard0"
|
||||||
|
"""
|
||||||
|
utils.pgcat_generic_start(config)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_admin_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(admin=True)
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_normal_trust_auth(self):
|
||||||
|
conn, cur = utils.connect_db_trust(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
class TestMD5Auth:
|
||||||
|
@classmethod
|
||||||
|
def setup_method(cls):
|
||||||
|
utils.pgcat_start()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_method(self):
|
||||||
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def test_normal_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
def test_admin_db_access(self):
|
||||||
|
conn, cur = utils.connect_db(admin=True)
|
||||||
|
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
utils.cleanup_conn(conn, cur)
|
||||||
@@ -1,30 +1,12 @@
|
|||||||
import os
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
||||||
import utils
|
import utils
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
def test_normal_db_access():
|
|
||||||
utils.pgcat_start()
|
|
||||||
conn, cur = utils.connect_db(autocommit=False)
|
|
||||||
cur.execute("SELECT 1")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_admin_db_access():
|
|
||||||
conn, cur = utils.connect_db(admin=True)
|
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
|
|
||||||
@@ -256,3 +238,5 @@ def test_shutdown_logic():
|
|||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
utils.cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
utils.pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
|
# - - - - - - - - - - - - - - - - - -
|
||||||
|
|||||||
@@ -1,20 +1,49 @@
|
|||||||
from typing import Tuple
|
|
||||||
import os
|
import os
|
||||||
import psutil
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
from typing import Tuple
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import psutil
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
PGCAT_HOST = "127.0.0.1"
|
||||||
PGCAT_PORT = "6432"
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
|
def _pgcat_start(config_path: str):
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
os.system(f"./target/debug/pgcat {config_path} &")
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
_pgcat_start(config_path='.circleci/pgcat.toml')
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_generic_start(config: str):
|
||||||
|
tmp = tempfile.NamedTemporaryFile()
|
||||||
|
with open(tmp.name, 'w') as f:
|
||||||
|
f.write(config)
|
||||||
|
_pgcat_start(config_path=tmp.name)
|
||||||
|
|
||||||
|
|
||||||
|
def glauth_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if proc.name() == "glauth":
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep glauth'):
|
||||||
|
raise Exception("glauth not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
try:
|
try:
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
@@ -54,6 +83,27 @@ def connect_db(
|
|||||||
|
|
||||||
return (conn, cur)
|
return (conn, cur)
|
||||||
|
|
||||||
|
def connect_db_trust(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user