This commit is contained in:
Lev Kokotov
2023-03-30 17:35:32 -07:00
parent 4969abf355
commit ed31053cdb
6 changed files with 42 additions and 14 deletions

View File

@@ -122,6 +122,10 @@ idle_timeout = 40000
# Connect timeout can be overwritten in the pool # Connect timeout can be overwritten in the pool
connect_timeout = 3000 connect_timeout = 3000
# auth_query = "SELECT * FROM public.user_lookup('$1')"
# auth_query_user = "postgres"
# auth_query_password = "postgres"
# User configs are structured as pool.<pool_name>.users.<user_index> # User configs are structured as pool.<pool_name>.users.<user_index>
# This secion holds the credentials for users that may connect to this cluster # This secion holds the credentials for users that may connect to this cluster
[pools.sharded_db.users.0] [pools.sharded_db.users.0]
@@ -130,8 +134,8 @@ username = "sharding_user"
# Postgresql password # Postgresql password
password = "sharding_user" password = "sharding_user"
# Passwords the client can use to connect. Useful for password rotations. # # Passwords the client can use to connect. Useful for password rotations.
secrets = [ "secret_one", "secret_two" ] # secrets = [ "secret_one", "secret_two" ]
# Maximum number of server connections that can be established for this user # Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster # The maximum number of connection from a single Pgcat process to any database in the cluster

View File

@@ -26,13 +26,19 @@ where
{ {
let config = get_config(); let config = get_config();
debug!("Fetching auth hash");
if config.is_auth_query_configured() { if config.is_auth_query_configured() {
let address = pool.address(0, 0); let address = pool.address(0, 0);
if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) { if let Some(apt) = AuthPassthrough::from_pool_settings(&pool.settings) {
let hash = apt.fetch_hash(address).await?; let hash = apt.fetch_hash(address).await?;
debug!("Auth query succeeded");
return Ok(hash); return Ok(hash);
} }
} else {
debug!("Auth query not configured on pool");
} }
error_response( error_response(
@@ -303,6 +309,7 @@ impl Md5 {
if our_hash != password_hash { if our_hash != password_hash {
wrong_password(write, &self.username).await?; wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!( Err(Error::ClientError(format!(
"Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", "Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}",
self.username, self.pool_name, self.application_name self.username, self.pool_name, self.application_name
@@ -346,12 +353,19 @@ impl Md5 {
))); )));
} }
debug!("Using auth_query");
// Fetch hash from server // Fetch hash from server
let hash = (*pool.auth_hash.read()).clone(); let hash = (*pool.auth_hash.read()).clone();
let hash = match hash { let hash = match hash {
Some(hash) => hash.clone(), Some(hash) => {
debug!("Using existing hash: {}", hash);
hash.clone()
}
None => { None => {
debug!("Pool has no hash set, fetching new one");
let hash = refetch_auth_hash( let hash = refetch_auth_hash(
&pool, &pool,
write, write,
@@ -370,6 +384,8 @@ impl Md5 {
// Compare hashes // Compare hashes
if our_hash != password_hash { if our_hash != password_hash {
debug!("Pool auth query hash did not match, refetching");
// Server hash maybe changed // Server hash maybe changed
let hash = refetch_auth_hash( let hash = refetch_auth_hash(
&pool, &pool,
@@ -382,6 +398,8 @@ impl Md5 {
let our_hash = md5_hash_second_pass(&hash, &self.salt); let our_hash = md5_hash_second_pass(&hash, &self.salt);
if our_hash != password_hash { if our_hash != password_hash {
debug!("Auth query failed, passwords don't match");
wrong_password(write, &self.username).await?; wrong_password(write, &self.username).await?;
Err(Error::ClientError(format!( Err(Error::ClientError(format!(
@@ -402,12 +420,10 @@ impl Md5 {
Ok(()) Ok(())
} }
} else { } else {
wrong_password(write, &self.username).await?; validate_pool(write, pool.clone(), &self.username, &self.pool_name)
.await?;
Err(Error::ClientError(format!( Ok(())
"Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}",
self.username, self.pool_name, self.application_name
)))
} }
} }
} }

View File

@@ -1,7 +1,7 @@
/// Helper functions to send one-off protocol messages /// Helper functions to send one-off protocol messages
/// and handle TcpStream (TCP socket). /// and handle TcpStream (TCP socket).
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::error; use log::{debug, error};
use md5::{Digest, Md5}; use md5::{Digest, Md5};
use socket2::{SockRef, TcpKeepalive}; use socket2::{SockRef, TcpKeepalive};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -234,6 +234,8 @@ pub async fn md5_password_with_hash<S>(stream: &mut S, hash: &str, salt: &[u8])
where where
S: tokio::io::AsyncWrite + std::marker::Unpin, S: tokio::io::AsyncWrite + std::marker::Unpin,
{ {
debug!("Sending hash {} to server", hash);
let password = md5_hash_second_pass(hash, salt); let password = md5_hash_second_pass(hash, salt);
let mut message = BytesMut::with_capacity(password.len() as usize + 5); let mut message = BytesMut::with_capacity(password.len() as usize + 5);

View File

@@ -755,7 +755,9 @@ impl Server {
Arc::new(RwLock::new(None)), Arc::new(RwLock::new(None)),
) )
.await?; .await?;
debug!("Connected!, sending query.");
debug!("Connected!, sending query: {}", query);
server.send(&simple_query(query)).await?; server.send(&simple_query(query)).await?;
let mut message = server.recv().await?; let mut message = server.recv().await?;
@@ -764,6 +766,8 @@ impl Server {
} }
async fn parse_query_message(message: &mut BytesMut) -> Result<Vec<String>, Error> { async fn parse_query_message(message: &mut BytesMut) -> Result<Vec<String>, Error> {
debug!("Parsing query message");
let mut pair = Vec::<String>::new(); let mut pair = Vec::<String>::new();
match message::backend::Message::parse(message) { match message::backend::Message::parse(message) {
Ok(Some(message::backend::Message::RowDescription(_description))) => {} Ok(Some(message::backend::Message::RowDescription(_description))) => {}
@@ -833,6 +837,9 @@ async fn parse_query_message(message: &mut BytesMut) -> Result<Vec<String>, Erro
} }
}; };
} }
debug!("Got auth hash successfully");
Ok(pair) Ok(pair)
} }

View File

@@ -67,7 +67,7 @@ describe "Auth Query" do
end end
context 'and with cleartext passwords not set' do context 'and with cleartext passwords not set' do
let(:config_user) { { 'username' => 'sharding_user', 'password' => 'sharding_user' } } let(:config_user) { { 'username' => 'sharding_user' } }
it 'it uses obtained passwords' do it 'it uses obtained passwords' do
connection_string = processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password']) connection_string = processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password'])
@@ -76,7 +76,7 @@ describe "Auth Query" do
end end
it 'allows passwords to be changed without closing existing connections' do it 'allows passwords to be changed without closing existing connections' do
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'])) pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password']))
expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil
Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';") Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';")
expect(pgconn.exec("SELECT 1 + 4")).not_to be_nil expect(pgconn.exec("SELECT 1 + 4")).not_to be_nil
@@ -84,7 +84,7 @@ describe "Auth Query" do
end end
it 'allows passwords to be changed and that new password is needed when reconnecting' do it 'allows passwords to be changed and that new password is needed when reconnecting' do
pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'])) pgconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], pg_user['password']))
expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil expect(pgconn.exec("SELECT 1 + 2")).not_to be_nil
Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';") Helpers::AuthQuery.exec_in_instances(query: "ALTER USER #{pg_user['username']} WITH ENCRYPTED PASSWORD 'secret2';")
newconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], 'secret2')) newconn = PG.connect(processes.pgcat.connection_string("sharded_db", pg_user['username'], 'secret2'))

View File

@@ -78,7 +78,6 @@ class PgcatProcess
10.times do 10.times do
Process.kill 0, @pid Process.kill 0, @pid
PG::connect(connection_string || example_connection_string).close PG::connect(connection_string || example_connection_string).close
return self return self
rescue Errno::ESRCH rescue Errno::ESRCH
raise StandardError, "Process #{@pid} died. #{logs}" raise StandardError, "Process #{@pid} died. #{logs}"