Files
pgcat/src/mirrors.rs

191 lines
6.8 KiB
Rust
Raw Normal View History

use std::sync::Arc;
/// A mirrored PostgreSQL client.
/// Packets arrive to us through a channel from the main client and we send them to the server.
use bb8::Pool;
use bytes::{Bytes, BytesMut};
Auth passthrough (auth_query) (#266) * Add a new exec_simple_query method This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats. * Add auth passthough (auth_query) Adds a feature that allows setting auth passthrough for md5 auth. It adds 3 new (general and pool) config parameters: - `auth_query`: An string containing a query that will be executed on boot to obtain the hash of a given user. This query have to use a placeholder `$1`, so pgcat can replace it with the user its trying to fetch the hash from. - `auth_query_user`: The user to use for connecting to the server and executing the auth_query. - `auth_query_password`: The password to use for connecting to the server and executing the auth_query. The configuration can be done either on the general config (so pools share them) or in a per-pool basis. The behavior is, at boot time, when validating server connections, a hash is fetched per server and stored in the pool. When new server connections are created, and no cleartext password is specified, the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries it. When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no hash (we could not obtain one when validating the connection), a new fetch is tried. Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure we refetch the hash and retry auth (so password changes can be done). The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be obtained during connection validation, or the password has change, we can still connect later. * Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
use parking_lot::RwLock;
use crate::config::{get_config, Address, Role, User};
Refactor Pool Stats to be based off of Server/Client stats (#445) What is wrong Stats reported by SHOW POOLS seem to be leaking. We see lingering cl_idle , cl_waiting, and similarly for sv_idle , sv_active. We confirmed that these are reporting issues not actual lingering clients. This behavior is readily reproducible by running while true; do psql "postgres://sharding_user:sharding_user@localhost:6432/sharded_db" -c "SELECT 1" > /dev/null 2>&1 & done Why it happens I wasn't able to get to figure our the reason for the bug but my best guess is that we have race conditions when updating pool-level stats. So even though individual update operations are atomic, we perform a check then update sequence which is not protected by a guard. https://github.com/postgresml/pgcat/blob/main/src/stats/pool.rs#L174-L179 I am also suspecting that using Relaxed ordering might allow this behavior (I changed all operations to use Ordering::SeqCst but still got lingering clients) How to fix Since SHOW POOLS/SHOW SERVER/SHOW CLIENTS all show the current state of the proxy (as opposed to SHOW STATS which show aggregate values), this PR refactors SHOW POOLS to have it construct the results directly from SHOW SERVER and SHOW CLIENT datasets. This reduces the complexity of stat updates and eliminates the need for having locks when updating pool stats as we only care about updating individual client/server states. This will change the semantics of maxwait, so instead of it holding the maxwait time ever encountered by a client (connected or disconnected), it will only consider connected clients which should be okay given PgCat tends to hold on to client connections more than Pgbouncer.
2023-05-23 08:44:49 -05:00
use crate::pool::{ClientServerMap, ServerPool};
use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
pub struct MirroredClient {
address: Address,
user: User,
database: String,
bytes_rx: Receiver<Bytes>,
disconnect_rx: Receiver<()>,
}
impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
Reimplement prepared statements with LRU cache and statement deduplication (#618) * Initial commit * Cleanup and add stats * Use an arc instead of full clones to store the parse packets * Use mutex instead * fmt * clippy * fmt * fix? * fix? * fmt * typo * Update docs * Refactor custom protocol * fmt * move custom protocol handling to before parsing * Support describe * Add LRU for server side statement cache * rename variable * Refactoring * Move docs * Fix test * fix * Update tests * trigger build * Add more tests * Reorder handling sync * Support when a named describe is sent along with Parse (go pgx) and expecting results * don't talk to client if not needed when client sends Parse * fmt :( * refactor tests * nit * Reduce hashing * Reducing work done to decode describe and parse messages * minor refactor * Merge branch 'main' into zain/reimplment-prepared-statements-with-global-lru-cache * Rewrite extended and prepared protocol message handling to better support mocking response packets and close * An attempt to better handle if there are DDL changes that might break cached plans with ideas about how to further improve it * fix * Minor stats fixed and cleanup * Cosmetic fixes (#64) * Cosmetic fixes * fix test * Change server drop for statement cache error to a `deallocate all` * Updated comments and added new idea for handling DDL changes impacting cached plans * fix test? * Revert test change * trigger build, flakey test * Avoid potential race conditions by changing get_or_insert to promote for pool LRU * remove ps enabled variable on the server in favor of using an option * Add close to the Extended Protocol buffer --------- Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2023-10-25 18:11:57 -04:00
let (connection_timeout, idle_timeout, _cfg, prepared_statement_cache_size) =
match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
cfg.idle_timeout.unwrap_or(default),
cfg.clone(),
Reimplement prepared statements with LRU cache and statement deduplication (#618) * Initial commit * Cleanup and add stats * Use an arc instead of full clones to store the parse packets * Use mutex instead * fmt * clippy * fmt * fix? * fix? * fmt * typo * Update docs * Refactor custom protocol * fmt * move custom protocol handling to before parsing * Support describe * Add LRU for server side statement cache * rename variable * Refactoring * Move docs * Fix test * fix * Update tests * trigger build * Add more tests * Reorder handling sync * Support when a named describe is sent along with Parse (go pgx) and expecting results * don't talk to client if not needed when client sends Parse * fmt :( * refactor tests * nit * Reduce hashing * Reducing work done to decode describe and parse messages * minor refactor * Merge branch 'main' into zain/reimplment-prepared-statements-with-global-lru-cache * Rewrite extended and prepared protocol message handling to better support mocking response packets and close * An attempt to better handle if there are DDL changes that might break cached plans with ideas about how to further improve it * fix * Minor stats fixed and cleanup * Cosmetic fixes (#64) * Cosmetic fixes * fix test * Change server drop for statement cache error to a `deallocate all` * Updated comments and added new idea for handling DDL changes impacting cached plans * fix test? * Revert test change * trigger build, flakey test * Avoid potential race conditions by changing get_or_insert to promote for pool LRU * remove ps enabled variable on the server in favor of using an option * Add close to the Extended Protocol buffer --------- Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2023-10-25 18:11:57 -04:00
cfg.prepared_statements_cache_size,
),
Reimplement prepared statements with LRU cache and statement deduplication (#618) * Initial commit * Cleanup and add stats * Use an arc instead of full clones to store the parse packets * Use mutex instead * fmt * clippy * fmt * fix? * fix? * fmt * typo * Update docs * Refactor custom protocol * fmt * move custom protocol handling to before parsing * Support describe * Add LRU for server side statement cache * rename variable * Refactoring * Move docs * Fix test * fix * Update tests * trigger build * Add more tests * Reorder handling sync * Support when a named describe is sent along with Parse (go pgx) and expecting results * don't talk to client if not needed when client sends Parse * fmt :( * refactor tests * nit * Reduce hashing * Reducing work done to decode describe and parse messages * minor refactor * Merge branch 'main' into zain/reimplment-prepared-statements-with-global-lru-cache * Rewrite extended and prepared protocol message handling to better support mocking response packets and close * An attempt to better handle if there are DDL changes that might break cached plans with ideas about how to further improve it * fix * Minor stats fixed and cleanup * Cosmetic fixes (#64) * Cosmetic fixes * fix test * Change server drop for statement cache error to a `deallocate all` * Updated comments and added new idea for handling DDL changes impacting cached plans * fix test? * Revert test change * trigger build, flakey test * Avoid potential race conditions by changing get_or_insert to promote for pool LRU * remove ps enabled variable on the server in favor of using an option * Add close to the Extended Protocol buffer --------- Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2023-10-25 18:11:57 -04:00
None => (default, default, crate::config::Pool::default(), 0),
};
let manager = ServerPool::new(
self.address.clone(),
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
Auth passthrough (auth_query) (#266) * Add a new exec_simple_query method This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats. * Add auth passthough (auth_query) Adds a feature that allows setting auth passthrough for md5 auth. It adds 3 new (general and pool) config parameters: - `auth_query`: An string containing a query that will be executed on boot to obtain the hash of a given user. This query have to use a placeholder `$1`, so pgcat can replace it with the user its trying to fetch the hash from. - `auth_query_user`: The user to use for connecting to the server and executing the auth_query. - `auth_query_password`: The password to use for connecting to the server and executing the auth_query. The configuration can be done either on the general config (so pools share them) or in a per-pool basis. The behavior is, at boot time, when validating server connections, a hash is fetched per server and stored in the pool. When new server connections are created, and no cleartext password is specified, the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries it. When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no hash (we could not obtain one when validating the connection), a new fetch is tried. Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure we refetch the hash and retry auth (so password changes can be done). The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be obtained during connection validation, or the password has change, we can still connect later. * Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
Arc::new(RwLock::new(None)),
None,
true,
false,
Reimplement prepared statements with LRU cache and statement deduplication (#618) * Initial commit * Cleanup and add stats * Use an arc instead of full clones to store the parse packets * Use mutex instead * fmt * clippy * fmt * fix? * fix? * fmt * typo * Update docs * Refactor custom protocol * fmt * move custom protocol handling to before parsing * Support describe * Add LRU for server side statement cache * rename variable * Refactoring * Move docs * Fix test * fix * Update tests * trigger build * Add more tests * Reorder handling sync * Support when a named describe is sent along with Parse (go pgx) and expecting results * don't talk to client if not needed when client sends Parse * fmt :( * refactor tests * nit * Reduce hashing * Reducing work done to decode describe and parse messages * minor refactor * Merge branch 'main' into zain/reimplment-prepared-statements-with-global-lru-cache * Rewrite extended and prepared protocol message handling to better support mocking response packets and close * An attempt to better handle if there are DDL changes that might break cached plans with ideas about how to further improve it * fix * Minor stats fixed and cleanup * Cosmetic fixes (#64) * Cosmetic fixes * fix test * Change server drop for statement cache error to a `deallocate all` * Updated comments and added new idea for handling DDL changes impacting cached plans * fix test? * Revert test change * trigger build, flakey test * Avoid potential race conditions by changing get_or_insert to promote for pool LRU * remove ps enabled variable on the server in favor of using an option * Add close to the Extended Protocol buffer --------- Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2023-10-25 18:11:57 -04:00
prepared_statement_cache_size,
);
Pool::builder()
.max_size(1)
.connection_timeout(std::time::Duration::from_millis(connection_timeout))
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
.test_on_check_out(false)
.build(manager)
.await
.unwrap()
}
pub fn start(mut self) {
tokio::spawn(async move {
let pool = self.create_pool().await;
let address = self.address.clone();
loop {
let mut server = match pool.get().await {
Ok(server) => server,
Err(err) => {
error!(
"Failed to get connection from pool, Discarding message {:?}, {:?}",
err,
address.clone()
);
continue;
}
};
tokio::select! {
// Exit channel events
_ = self.disconnect_rx.recv() => {
info!("Got mirror exit signal, exiting {:?}", address.clone());
break;
}
// Incoming data from server (we read to clear the socket buffer and discard the data)
recv_result = server.recv(None) => {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()).as_str()
);
}
}
}
// Messages to send to the server
message = self.bytes_rx.recv() => {
match message {
Some(bytes) => {
match server.send(&BytesMut::from(&bytes[..])).await {
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
Err(err) => {
server.mark_bad(
format!("Failed to receive from mirror {:?} {:?}", err, address.clone()).as_str()
);
}
}
}
None => {
info!("Mirror channel closed, exiting {:?}", address.clone());
break;
},
}
}
}
}
});
}
}
pub struct MirroringManager {
pub byte_senders: Vec<Sender<Bytes>>,
pub disconnect_senders: Vec<Sender<()>>,
}
impl MirroringManager {
pub fn from_addresses(
user: User,
database: String,
addresses: Vec<Address>,
) -> MirroringManager {
let mut byte_senders: Vec<Sender<Bytes>> = vec![];
let mut exit_senders: Vec<Sender<()>> = vec![];
addresses.iter().for_each(|mirror| {
let (bytes_tx, bytes_rx) = channel::<Bytes>(10);
let (exit_tx, exit_rx) = channel::<()>(1);
let mut addr = mirror.clone();
addr.role = Role::Mirror;
let client = MirroredClient {
user: user.clone(),
database: database.to_owned(),
address: addr,
bytes_rx,
disconnect_rx: exit_rx,
};
exit_senders.push(exit_tx);
byte_senders.push(bytes_tx);
client.start();
});
Self {
byte_senders,
disconnect_senders: exit_senders,
}
}
pub fn send(&mut self, bytes: &BytesMut) {
// We want to avoid performing an allocation if we won't be able to send the message
// There is a possibility of a race here where we check the capacity and then the channel is
// closed or the capacity is reduced to 0, but mirroring is best effort anyway
if self
.byte_senders
.iter()
.all(|sender| sender.capacity() == 0 || sender.is_closed())
{
return;
}
let immutable_bytes = bytes.clone().freeze();
self.byte_senders.iter_mut().for_each(|sender| {
match sender.try_send(immutable_bytes.clone()) {
Ok(_) => {}
Err(err) => {
warn!("Failed to send bytes to a mirror channel {}", err);
}
}
});
}
pub fn disconnect(&mut self) {
self.disconnect_senders
.iter_mut()
.for_each(|sender| match sender.try_send(()) {
Ok(_) => {}
Err(err) => {
warn!(
"Failed to send disconnect signal to a mirror channel {}",
err
);
}
});
}
}