A couple things (#397)

* Format cleanup

* fmt

* finally
This commit is contained in:
Lev Kokotov
2023-04-10 14:51:01 -07:00
committed by GitHub
parent a62f6b0eea
commit 692353c839
13 changed files with 366 additions and 100 deletions

View File

@@ -1,4 +1,4 @@
use crate::errors::Error;
use crate::errors::{ClientIdentifier, Error};
use crate::pool::BanReason;
/// Handle clients by pretending to be a PostgreSQL server.
use bytes::{Buf, BufMut, BytesMut};
@@ -12,7 +12,7 @@ use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::auth_passthrough::AuthPassthrough;
use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::constants::*;
use crate::messages::*;
@@ -202,7 +202,7 @@ pub async fn client_entrypoint(
// Client probably disconnected rejecting our plain text connection.
Ok((ClientConnectionType::Tls, _))
| Ok((ClientConnectionType::CancelQuery, _)) => Err(Error::ProtocolSyncError(
format!("Bad postgres client (plain)"),
"Bad postgres client (plain)".into(),
)),
Err(err) => Err(err),
@@ -369,28 +369,14 @@ pub async fn startup_tls(
}
// Bad Postgres client.
Ok((ClientConnectionType::Tls, _)) | Ok((ClientConnectionType::CancelQuery, _)) => Err(
Error::ProtocolSyncError(format!("Bad postgres client (tls)")),
),
Ok((ClientConnectionType::Tls, _)) | Ok((ClientConnectionType::CancelQuery, _)) => {
Err(Error::ProtocolSyncError("Bad postgres client (tls)".into()))
}
Err(err) => Err(err),
}
}
async fn refetch_auth_hash(pool: &ConnectionPool) -> Result<String, Error> {
let address = pool.address(0, 0);
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
let hash = apt.fetch_hash(address).await?;
return Ok(hash);
}
Err(Error::ClientError(format!(
"Could not obtain hash for {{ username: {:?}, database: {:?} }}. Auth passthrough not enabled.",
address.username, address.database
)))
}
impl<S, T> Client<S, T>
where
S: tokio::io::AsyncRead + std::marker::Unpin,
@@ -418,7 +404,7 @@ where
Some(user) => user,
None => {
return Err(Error::ClientError(
"Missing user parameter on client startup".to_string(),
"Missing user parameter on client startup".into(),
))
}
};
@@ -433,6 +419,8 @@ where
None => "pgcat",
};
let client_identifier = ClientIdentifier::new(&application_name, &username, &pool_name);
let admin = ["pgcat", "pgbouncer"]
.iter()
.filter(|db| *db == pool_name)
@@ -463,7 +451,12 @@ where
let code = match read.read_u8().await {
Ok(p) => p,
Err(_) => return Err(Error::SocketError(format!("Error reading password code from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
Err(_) => {
return Err(Error::ClientSocketError(
"password code".into(),
client_identifier,
))
}
};
// PasswordMessage
@@ -476,19 +469,30 @@ where
let len = match read.read_i32().await {
Ok(len) => len,
Err(_) => return Err(Error::SocketError(format!("Error reading password message length from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
Err(_) => {
return Err(Error::ClientSocketError(
"password message length".into(),
client_identifier,
))
}
};
let mut password_response = vec![0u8; (len - 4) as usize];
match read.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError(format!("Error reading password message from client {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name))),
Err(_) => {
return Err(Error::ClientSocketError(
"password message".into(),
client_identifier,
))
}
};
// Authenticate admin user.
let (transaction_mode, server_info) = if admin {
let config = get_config();
// Compare server and client hashes.
let password_hash = md5_hash_password(
&config.general.admin_username,
@@ -497,10 +501,12 @@ where
);
if password_hash != password_response {
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name);
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
warn!("{}", error);
wrong_password(&mut write, username).await?;
return Err(Error::ClientError(format!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
return Err(error);
}
(false, generate_server_info_for_admin())
@@ -519,7 +525,10 @@ where
)
.await?;
return Err(Error::ClientError(format!("Invalid pool name {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
return Err(Error::ClientGeneralError(
"Invalid pool name".into(),
client_identifier,
));
}
};
@@ -530,16 +539,23 @@ where
Some(md5_hash_password(username, password, &salt))
} else {
if !get_config().is_auth_query_configured() {
return Err(Error::ClientError(format!("Client auth not possible, no cleartext password set for username: {:?} in config and auth passthrough (query_auth) is not set up.", username)));
return Err(Error::ClientAuthImpossible(username.into()));
}
let mut hash = (*pool.auth_hash.read()).clone();
if hash.is_none() {
warn!("Query auth configured but no hash password found for pool {}. Will try to refetch it.", pool_name);
warn!(
"Query auth configured \
but no hash password found \
for pool {}. Will try to refetch it.",
pool_name
);
match refetch_auth_hash(&pool).await {
Ok(fetched_hash) => {
warn!("Password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, obtained. Updating.", username, pool_name, application_name);
warn!("Password for {}, obtained. Updating.", client_identifier);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash.clone());
@@ -547,16 +563,12 @@ where
hash = Some(fetched_hash);
}
Err(err) => {
return Err(
Error::ClientError(
format!("No cleartext password set, and no auth passthrough could not obtain the hash from server for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, the error was: {:?}",
username,
pool_name,
application_name,
err)
)
);
return Err(Error::ClientAuthPassthroughError(
err.to_string(),
client_identifier,
));
}
}
};
@@ -570,20 +582,31 @@ where
//
// @TODO: we could end up fetching again the same password twice (see above).
if password_hash.unwrap() != password_response {
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, will try to refetch it.", username, pool_name, application_name);
warn!(
"Invalid password {}, will try to refetch it.",
client_identifier
);
let fetched_hash = refetch_auth_hash(&pool).await?;
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
// Ok password changed in server an auth is possible.
if new_password_hash == password_response {
warn!("Password for {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}, changed in server. Updating.", username, pool_name, application_name);
warn!(
"Password for {}, changed in server. Updating.",
client_identifier
);
{
let mut pool_auth_hash = pool.auth_hash.write();
*pool_auth_hash = Some(fetched_hash);
}
} else {
wrong_password(&mut write, username).await?;
return Err(Error::ClientError(format!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", username, pool_name, application_name)));
return Err(Error::ClientGeneralError(
"Invalid password".into(),
client_identifier,
));
}
}
@@ -753,9 +776,9 @@ where
&mut self.write,
"terminating connection due to administrator command"
).await?;
self.stats.disconnect();
return Ok(())
self.stats.disconnect();
return Ok(());
}
// Admin clients ignore shutdown.
@@ -928,11 +951,26 @@ where
error!("Got Sync message but failed to get a connection from the pool");
self.buffer.clear();
}
error_response(&mut self.write, "could not get connection from the pool")
.await?;
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
error!(
"Could not get connection from pool: \
{{ \
pool_name: {:?}, \
username: {:?}, \
shard: {:?}, \
role: \"{:?}\", \
error: \"{:?}\" \
}}",
self.pool_name,
self.username,
query_router.shard(),
query_router.role(),
err
);
continue;
}
};
@@ -999,11 +1037,25 @@ where
Err(_) => {
// Client idle in transaction timeout
error_response(&mut self.write, "idle transaction timeout").await?;
error!("Client idle in transaction timeout: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\"}}", self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role());
error!(
"Client idle in transaction timeout: \
{{ \
pool_name: {}, \
username: {}, \
shard: {}, \
role: \"{:?}\" \
}}",
self.pool_name,
self.username,
query_router.shard(),
query_router.role()
);
break;
}
}
}
Some(message) => {
initial_message = None;
message