Compare commits

..

2 Commits

Author SHA1 Message Date
Andrew Jackson
bc28d68ec2 fix ci 2024-09-04 21:06:11 -05:00
CommanderKeynes
494e8126e1 Implemented python tests with pytest 2024-09-03 18:38:43 -05:00
15 changed files with 176 additions and 441 deletions

View File

@@ -26,7 +26,6 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
# Start Toxiproxy # Start Toxiproxy
kill -9 $(pgrep toxiproxy) || true
LOG_LEVEL=error toxiproxy-server & LOG_LEVEL=error toxiproxy-server &
sleep 1 sleep 1
@@ -178,6 +177,3 @@ killall pgcat -s SIGINT
# Allow for graceful shutdown # Allow for graceful shutdown
sleep 1 sleep 1
kill -9 $(pgrep toxiproxy)
sleep 1

View File

@@ -22,7 +22,7 @@ jobs:
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and # Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
# yamllint (https://github.com/adrienverge/yamllint) which require Python # yamllint (https://github.com/adrienverge/yamllint) which require Python
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5.1.0 uses: actions/setup-python@v4.1.0
with: with:
python-version: 3.7 python-version: 3.7
@@ -43,7 +43,7 @@ jobs:
run: ct lint --config ct.yaml run: ct lint --config ct.yaml
- name: Create kind cluster - name: Create kind cluster
uses: helm/kind-action@v1.10.0 uses: helm/kind-action@v1.7.0
if: steps.list-changed.outputs.changed == 'true' if: steps.list-changed.outputs.changed == 'true'
- name: Run chart-testing (install) - name: Run chart-testing (install)

View File

@@ -1,9 +1,6 @@
name: pgcat package (deb) name: pgcat package (deb)
on: on:
push:
tags:
- v*
workflow_dispatch: workflow_dispatch:
inputs: inputs:
packageVersion: packageVersion:
@@ -19,14 +16,6 @@ jobs:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Set package version
if: github.event_name == 'push' # For push event
run: |
TAG=${{ github.ref_name }}
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
- name: Set package version (manual dispatch)
if: github.event_name == 'workflow_dispatch' # For manual dispatch
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
- uses: actions-rs/toolchain@v1 - uses: actions-rs/toolchain@v1
with: with:
toolchain: stable toolchain: stable
@@ -50,10 +39,10 @@ jobs:
export ARCH=arm64 export ARCH=arm64
fi fi
bash utilities/deb.sh ${{ env.packageVersion }} bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \ deb-s3 upload \
--lock \ --lock \
--bucket apt.postgresml.org \ --bucket apt.postgresml.org \
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \ pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs) --codename $(lsb_release -cs)

4
Cargo.lock generated
View File

@@ -1526,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "sqlparser" name = "sqlparser"
version = "0.51.0" version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7" checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [ dependencies = [
"log", "log",
"sqlparser_derive", "sqlparser_derive",

View File

@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1" regex = "1"
num_cpus = "1" num_cpus = "1"
once_cell = "1" once_cell = "1"
sqlparser = { version = "0.51", features = ["visitor"] } sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4" log = "0.4"
arc-swap = "1" arc-swap = "1"
parking_lot = "0.12.1" parking_lot = "0.12.1"

View File

@@ -7,7 +7,3 @@ systemctl enable pgcat
if ! id pgcat 2> /dev/null; then if ! id pgcat 2> /dev/null; then
useradd -s /usr/bin/false pgcat useradd -s /usr/bin/false pgcat
fi fi
if [ -f /etc/pgcat.toml ]; then
systemctl start pgcat
fi

View File

@@ -1,4 +1,3 @@
use crate::config::AuthType;
use crate::errors::Error; use crate::errors::Error;
use crate::pool::ConnectionPool; use crate::pool::ConnectionPool;
use crate::server::Server; use crate::server::Server;
@@ -72,7 +71,6 @@ impl AuthPassthrough {
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> { pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
let auth_user = crate::config::User { let auth_user = crate::config::User {
username: self.user.clone(), username: self.user.clone(),
auth_type: AuthType::MD5,
password: Some(self.password.clone()), password: Some(self.password.clone()),
server_username: None, server_username: None,
server_password: None, server_password: None,

View File

@@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_parameters_for_admin, handle_admin}; use crate::admin::{generate_server_parameters_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash; use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{ use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
};
use crate::constants::*; use crate::constants::*;
use crate::messages::*; use crate::messages::*;
use crate::plugins::PluginOutput; use crate::plugins::PluginOutput;
@@ -465,8 +463,8 @@ where
.count() .count()
== 1; == 1;
// Kick any client that's not admin while we're in admin-only mode.
if !admin && admin_only { if !admin && admin_only {
// Kick any client that's not admin while we're in admin-only mode.
debug!( debug!(
"Rejecting non-admin connection to {} when in admin only mode", "Rejecting non-admin connection to {} when in admin only mode",
pool_name pool_name
@@ -483,76 +481,72 @@ where
let process_id: i32 = rand::random(); let process_id: i32 = rand::random();
let secret_key: i32 = rand::random(); let secret_key: i32 = rand::random();
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
if code as char != 'p' {
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let mut prepared_statements_enabled = false; let mut prepared_statements_enabled = false;
// Authenticate admin user. // Authenticate admin user.
let (transaction_mode, mut server_parameters) = if admin { let (transaction_mode, mut server_parameters) = if admin {
let config = get_config(); let config = get_config();
// TODO: Add SASL support.
// Perform MD5 authentication.
match config.general.admin_auth_type {
AuthType::Trust => (),
AuthType::MD5 => {
let salt = md5_challenge(&mut write).await?;
let code = match read.read_u8().await { // Compare server and client hashes.
Ok(p) => p, let password_hash = md5_hash_password(
Err(_) => { &config.general.admin_username,
return Err(Error::ClientSocketError( &config.general.admin_password,
"password code".into(), &salt,
client_identifier, );
))
}
};
// PasswordMessage if password_hash != password_response {
if code as char != 'p' { let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
return Err(Error::ProtocolSyncError(format!(
"Expected p, got {}",
code as char
)));
}
let len = match read.read_i32().await { warn!("{}", error);
Ok(len) => len, wrong_password(&mut write, username).await?;
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize]; return Err(error);
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
&config.general.admin_password,
&salt,
);
if password_hash != password_response {
let error =
Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(error);
}
}
} }
(false, generate_server_parameters_for_admin()) (false, generate_server_parameters_for_admin())
} }
// Authenticate normal user. // Authenticate normal user.
@@ -579,143 +573,92 @@ where
// Obtain the hash to compare, we give preference to that written in cleartext in config // Obtain the hash to compare, we give preference to that written in cleartext in config
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained // if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
// when the pool was created. If there is no hash there, we try to fetch it one more time. // when the pool was created. If there is no hash there, we try to fetch it one more time.
match pool.settings.user.auth_type { let password_hash = if let Some(password) = &pool.settings.user.password {
AuthType::Trust => (), Some(md5_hash_password(username, password, &salt))
AuthType::MD5 => { } else {
// Perform MD5 authentication. if !get_config().is_auth_query_configured() {
// TODO: Add SASL support. wrong_password(&mut write, username).await?;
let salt = md5_challenge(&mut write).await?; return Err(Error::ClientAuthImpossible(username.into()));
}
let code = match read.read_u8().await { let mut hash = (*pool.auth_hash.read()).clone();
Ok(p) => p,
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage if hash.is_none() {
if code as char != 'p' { warn!(
return Err(Error::ProtocolSyncError(format!( "Query auth configured \
"Expected p, got {}", but no hash password found \
code as char for pool {}. Will try to refetch it.",
))); pool_name
} );
let len = match read.read_i32().await { match refetch_auth_hash(&pool).await {
Ok(len) => len, Ok(fetched_hash) => {
Err(_) => { warn!("Password for {}, obtained. Updating.", client_identifier);
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
let password_hash = if let Some(password) = &pool.settings.user.password {
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthImpossible(username.into()));
}
let mut hash = (*pool.auth_hash.read()).clone();
if hash.is_none() {
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!(
"Password for {}, obtained. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone());
}
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{ {
let mut pool_auth_hash = pool.auth_hash.write(); let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash); *pool_auth_hash = Some(fetched_hash.clone());
} }
} else {
hash = Some(fetched_hash);
}
Err(err) => {
wrong_password(&mut write, username).await?; wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(), return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier, client_identifier,
)); ));
} }
} }
};
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
};
// Once we have the resulting hash, we compare with what the client gave us.
// If they do not match and auth query is set up, we try to refetch the hash one more time
// to see if the password has changed since the pool was created.
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => fetched_hash,
Err(err) => {
wrong_password(&mut write, username).await?;
return Err(err);
}
};
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
}
} else {
wrong_password(&mut write, username).await?;
return Err(Error::ClientGeneralError(
"Invalid password".into(),
client_identifier,
));
} }
} }
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction; let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
prepared_statements_enabled = prepared_statements_enabled =
transaction_mode && pool.prepared_statement_cache.is_some(); transaction_mode && pool.prepared_statement_cache.is_some();

View File

@@ -208,9 +208,6 @@ impl Address {
pub struct User { pub struct User {
pub username: String, pub username: String,
pub password: Option<String>, pub password: Option<String>,
#[serde(default = "User::default_auth_type")]
pub auth_type: AuthType,
pub server_username: Option<String>, pub server_username: Option<String>,
pub server_password: Option<String>, pub server_password: Option<String>,
pub pool_size: u32, pub pool_size: u32,
@@ -228,7 +225,6 @@ impl Default for User {
User { User {
username: String::from("postgres"), username: String::from("postgres"),
password: None, password: None,
auth_type: AuthType::MD5,
server_username: None, server_username: None,
server_password: None, server_password: None,
pool_size: 15, pool_size: 15,
@@ -243,10 +239,6 @@ impl Default for User {
} }
impl User { impl User {
pub fn default_auth_type() -> AuthType {
AuthType::MD5
}
fn validate(&self) -> Result<(), Error> { fn validate(&self) -> Result<(), Error> {
if let Some(min_pool_size) = self.min_pool_size { if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size { if min_pool_size > self.pool_size {
@@ -342,9 +334,6 @@ pub struct General {
pub admin_username: String, pub admin_username: String,
pub admin_password: String, pub admin_password: String,
#[serde(default = "General::default_admin_auth_type")]
pub admin_auth_type: AuthType,
#[serde(default = "General::default_validate_config")] #[serde(default = "General::default_validate_config")]
pub validate_config: bool, pub validate_config: bool,
@@ -359,10 +348,6 @@ impl General {
"0.0.0.0".into() "0.0.0.0".into()
} }
pub fn default_admin_auth_type() -> AuthType {
AuthType::MD5
}
pub fn default_port() -> u16 { pub fn default_port() -> u16 {
5432 5432
} }
@@ -471,7 +456,6 @@ impl Default for General {
verify_server_certificate: false, verify_server_certificate: false,
admin_username: String::from("admin"), admin_username: String::from("admin"),
admin_password: String::from("admin"), admin_password: String::from("admin"),
admin_auth_type: AuthType::MD5,
validate_config: true, validate_config: true,
auth_query: None, auth_query: None,
auth_query_user: None, auth_query_user: None,
@@ -492,15 +476,6 @@ pub enum PoolMode {
Session, Session,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum AuthType {
#[serde(alias = "trust", alias = "Trust")]
Trust,
#[serde(alias = "md5", alias = "MD5")]
MD5,
}
impl std::fmt::Display for PoolMode { impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {

View File

@@ -200,17 +200,18 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> { impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut sorted_labels: Vec<_> = self.labels.iter().collect(); let formatted_labels = self
sorted_labels.sort_by_key(|&(key, _)| key); .labels
let formatted_labels = sorted_labels
.iter() .iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value)) .map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
write!( write!(
f, f,
"{name}{{{formatted_labels}}} {value}", "# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
name = format_args!("pgcat_{}", self.name), name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels, formatted_labels = formatted_labels,
value = self.value value = self.value
) )
@@ -246,7 +247,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone()); labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels) Self::from_name(&format!("databases_{}", name), value, labels)
} }
@@ -263,8 +264,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone()); labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string()); labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone()); labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
@@ -288,15 +288,6 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels) Self::from_name(&format!("pools_{}", name), value, labels)
} }
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
} }
async fn prometheus_stats( async fn prometheus_stats(
@@ -322,7 +313,6 @@ async fn prometheus_stats(
// Adds metrics shown in a SHOW STATS admin command. // Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) { fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -332,10 +322,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value) PrometheusMetric::<u64>::from_address(address, &key, value)
{ {
grouped_metrics lines.push(prometheus_metric.to_string());
.entry(key)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -343,53 +330,33 @@ fn push_address_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW POOLS admin command. // Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) { fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup(); let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() { for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() { for (name, value) in stats.clone() {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value) PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{ {
grouped_metrics lines.push(prometheus_metric.to_string());
.entry(name)
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for ({})", name, *pool_id); debug!("Metric {} not implemented for ({})", name, *pool_id);
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW DATABASES admin command. // Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) { fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone(); let pool_config = pool.settings.clone();
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server); let pool_state = pool.pool_state(shard, server);
let metrics = vec![ let metrics = vec![
("pool_size", pool_config.user.pool_size), ("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections), ("current_connections", pool_state.connections),
@@ -398,10 +365,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value) PrometheusMetric::<u32>::from_database_info(address, key, value)
{ {
grouped_metrics lines.push(prometheus_metric.to_string());
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -409,14 +373,6 @@ fn push_database_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -449,7 +405,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1, crate::stats::ServerState::Idle => entry.idle_count += 1,
} }
} }
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -472,10 +428,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value) PrometheusMetric::<u64>::from_server_info(address, key, value)
{ {
grouped_metrics lines.push(prometheus_metric.to_string());
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -484,14 +437,6 @@ fn push_server_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
pub async fn start_metric_server(http_addr: SocketAddr) { pub async fn start_metric_server(http_addr: SocketAddr) {

0
tests/python/conftest.py Normal file
View File

View File

@@ -1,71 +0,0 @@
import utils
import signal
class TestTrustAuth:
@classmethod
def setup_method(cls):
config= """
[general]
host = "0.0.0.0"
port = 6432
admin_username = "admin_user"
admin_password = ""
admin_auth_type = "trust"
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
auth_type = "trust"
pool_size = 10
min_pool_size = 1
pool_mode = "transaction"
[pools.sharded_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
]
database = "shard0"
"""
utils.pgcat_generic_start(config)
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_admin_trust_auth(self):
conn, cur = utils.connect_db_trust(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_normal_trust_auth(self):
conn, cur = utils.connect_db_trust(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
class TestMD5Auth:
@classmethod
def setup_method(cls):
utils.pgcat_start()
@classmethod
def teardown_method(self):
utils.pg_cat_send_signal(signal.SIGTERM)
def test_normal_db_access(self):
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access(self):
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)

View File

@@ -1,12 +1,30 @@
import os
import signal import signal
import time import time
import psycopg2 import psycopg2
import utils import utils
SHUTDOWN_TIMEOUT = 5 SHUTDOWN_TIMEOUT = 5
def test_normal_db_access():
utils.pgcat_start()
conn, cur = utils.connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_admin_db_access():
conn, cur = utils.connect_db(admin=True)
cur.execute("SHOW POOLS")
res = cur.fetchall()
print(res)
utils.cleanup_conn(conn, cur)
def test_shutdown_logic(): def test_shutdown_logic():
@@ -238,5 +256,3 @@ def test_shutdown_logic():
utils.cleanup_conn(conn, cur) utils.cleanup_conn(conn, cur)
utils.pg_cat_send_signal(signal.SIGTERM) utils.pg_cat_send_signal(signal.SIGTERM)
# - - - - - - - - - - - - - - - - - -

View File

@@ -1,47 +1,18 @@
from typing import Tuple
import os import os
import psutil
import signal import signal
import time import time
from typing import Tuple
import tempfile
import psutil
import psycopg2 import psycopg2
PGCAT_HOST = "127.0.0.1" PGCAT_HOST = "127.0.0.1"
PGCAT_PORT = "6432" PGCAT_PORT = "6432"
def _pgcat_start(config_path: str):
pg_cat_send_signal(signal.SIGTERM)
os.system(f"./target/debug/pgcat {config_path} &")
time.sleep(2)
def pgcat_start(): def pgcat_start():
_pgcat_start(config_path='.circleci/pgcat.toml') pg_cat_send_signal(signal.SIGTERM)
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
time.sleep(2)
def pgcat_generic_start(config: str):
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'w') as f:
f.write(config)
_pgcat_start(config_path=tmp.name)
def glauth_send_signal(signal: signal.Signals):
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.name() == "glauth":
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)
if not os.system('pgrep glauth'):
raise Exception("glauth not closed after SIGTERM")
def pg_cat_send_signal(signal: signal.Signals): def pg_cat_send_signal(signal: signal.Signals):
@@ -83,27 +54,6 @@ def connect_db(
return (conn, cur) return (conn, cur)
def connect_db_trust(
autocommit: bool = True,
admin: bool = False,
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
if admin:
user = "admin_user"
db = "pgcat"
else:
user = "sharding_user"
db = "sharded_db"
conn = psycopg2.connect(
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
connect_timeout=2,
)
conn.autocommit = autocommit
cur = conn.cursor()
return (conn, cur)
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor): def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
cur.close() cur.close()

View File

@@ -24,8 +24,7 @@ GEM
pg (1.3.2) pg (1.3.2)
rainbow (3.1.1) rainbow (3.1.1)
regexp_parser (2.3.1) regexp_parser (2.3.1)
rexml (3.3.6) rexml (3.2.5)
strscan
rspec (3.11.0) rspec (3.11.0)
rspec-core (~> 3.11.0) rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0) rspec-expectations (~> 3.11.0)
@@ -51,7 +50,6 @@ GEM
rubocop-ast (1.17.0) rubocop-ast (1.17.0)
parser (>= 3.1.1.0) parser (>= 3.1.1.0)
ruby-progressbar (1.11.0) ruby-progressbar (1.11.0)
strscan (3.1.0)
toml (0.3.0) toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0) parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1) toxiproxy (2.0.1)