2023-04-10 14:51:01 -07:00
|
|
|
use crate::errors::{ClientIdentifier, Error};
|
2023-03-06 06:10:59 -06:00
|
|
|
use crate::pool::BanReason;
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Handle clients by pretending to be a PostgreSQL server.
|
2022-02-04 09:28:52 -08:00
|
|
|
use bytes::{Buf, BufMut, BytesMut};
|
2022-09-23 11:08:38 -05:00
|
|
|
use log::{debug, error, info, trace, warn};
|
2023-06-16 12:57:44 -07:00
|
|
|
use once_cell::sync::Lazy;
|
2023-10-25 18:11:57 -04:00
|
|
|
use std::collections::{HashMap, VecDeque};
|
2023-06-16 12:57:44 -07:00
|
|
|
use std::sync::{atomic::AtomicUsize, Arc};
|
2022-09-15 01:21:45 -05:00
|
|
|
use std::time::Instant;
|
2022-06-27 16:45:41 -07:00
|
|
|
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
|
|
|
|
use tokio::net::TcpStream;
|
2022-08-08 19:01:24 -04:00
|
|
|
use tokio::sync::broadcast::Receiver;
|
2022-08-25 06:40:56 -07:00
|
|
|
use tokio::sync::mpsc::Sender;
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
2023-04-10 14:51:01 -07:00
|
|
|
use crate::auth_passthrough::refetch_auth_hash;
|
2023-10-25 18:11:57 -04:00
|
|
|
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
2022-02-15 22:45:45 -08:00
|
|
|
use crate::constants::*;
|
2022-02-03 13:35:40 -08:00
|
|
|
use crate::messages::*;
|
2023-05-03 09:13:05 -07:00
|
|
|
use crate::plugins::PluginOutput;
|
2022-09-23 02:00:46 -04:00
|
|
|
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
2022-02-19 08:57:24 -08:00
|
|
|
use crate::query_router::{Command, QueryRouter};
|
2023-08-10 11:18:46 -04:00
|
|
|
use crate::server::{Server, ServerParameters};
|
2023-05-23 08:44:49 -05:00
|
|
|
use crate::stats::{ClientStats, ServerStats};
|
2022-06-27 16:45:41 -07:00
|
|
|
use crate::tls::Tls;
|
2022-06-27 15:52:01 -07:00
|
|
|
|
|
|
|
|
use tokio_rustls::server::TlsStream;
|
|
|
|
|
|
2023-06-16 12:57:44 -07:00
|
|
|
/// Incrementally count prepared statements
|
|
|
|
|
/// to avoid random conflicts in places where the random number generator is weak.
|
|
|
|
|
pub static PREPARED_STATEMENT_COUNTER: Lazy<Arc<AtomicUsize>> =
|
|
|
|
|
Lazy::new(|| Arc::new(AtomicUsize::new(0)));
|
|
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
/// Type of connection received from client.
|
|
|
|
|
enum ClientConnectionType {
|
|
|
|
|
Startup,
|
|
|
|
|
Tls,
|
|
|
|
|
CancelQuery,
|
|
|
|
|
}
|
2022-02-08 13:11:50 -08:00
|
|
|
|
2022-02-06 10:48:14 -08:00
|
|
|
/// The client state. One of these is created per client.
|
2022-06-27 15:52:01 -07:00
|
|
|
pub struct Client<S, T> {
|
2022-03-10 01:33:29 -08:00
|
|
|
/// The reads are buffered (8K by default).
|
2022-06-27 15:52:01 -07:00
|
|
|
read: BufReader<S>,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// We buffer the writes ourselves because we know the protocol
|
|
|
|
|
/// better than a stock buffer.
|
2022-06-27 15:52:01 -07:00
|
|
|
write: T,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Internal buffer, where we place messages until we have to flush
|
|
|
|
|
/// them to the backend.
|
2022-02-03 15:33:26 -08:00
|
|
|
buffer: BytesMut,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
/// Used to buffer response messages to the client
|
|
|
|
|
response_message_queue_buffer: BytesMut,
|
|
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
/// Address
|
|
|
|
|
addr: std::net::SocketAddr,
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// The client was started with the sole reason to cancel another running query.
|
2022-02-04 09:28:52 -08:00
|
|
|
cancel_mode: bool,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// In transaction mode, the connection is released after each transaction.
|
|
|
|
|
/// Session mode has slightly higher throughput per client, but lower capacity.
|
2022-02-05 15:23:21 -08:00
|
|
|
transaction_mode: bool,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// For query cancellation, the client is given a random process ID and secret on startup.
|
2022-02-04 09:28:52 -08:00
|
|
|
process_id: i32,
|
|
|
|
|
secret_key: i32,
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Clients are mapped to servers while they use them. This allows a client
|
|
|
|
|
/// to connect and cancel a query.
|
2022-02-04 16:01:35 -08:00
|
|
|
client_server_map: ClientServerMap,
|
2022-02-11 11:19:40 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Client parameters, e.g. user, client_encoding, etc.
|
2022-09-01 22:06:55 -05:00
|
|
|
#[allow(dead_code)]
|
2022-02-14 05:11:53 -08:00
|
|
|
parameters: HashMap<String, String>,
|
2022-02-14 10:00:55 -08:00
|
|
|
|
2023-03-28 17:19:37 +02:00
|
|
|
/// Statistics related to this client
|
|
|
|
|
stats: Arc<ClientStats>,
|
2022-02-25 18:20:15 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Clients want to talk to admin database.
|
2022-02-25 18:20:15 -08:00
|
|
|
admin: bool,
|
2022-03-04 17:04:27 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Last address the client talked to.
|
2022-03-04 17:04:27 -08:00
|
|
|
last_address_id: Option<usize>,
|
|
|
|
|
|
2023-03-28 17:19:37 +02:00
|
|
|
/// Last server process stats we talked to.
|
|
|
|
|
last_server_stats: Option<Arc<ServerStats>>,
|
2022-07-27 21:47:55 -05:00
|
|
|
|
2022-08-22 11:52:34 -07:00
|
|
|
/// Connected to server
|
|
|
|
|
connected_to_server: bool,
|
|
|
|
|
|
2022-08-09 14:18:27 -05:00
|
|
|
/// Name of the server pool for this client (This comes from the database name in the connection string)
|
2022-08-25 06:40:56 -07:00
|
|
|
pool_name: String,
|
2022-08-08 19:01:24 -04:00
|
|
|
|
2022-08-09 14:18:27 -05:00
|
|
|
/// Postgres user for this client (This comes from the user in the connection string)
|
2022-08-25 06:40:56 -07:00
|
|
|
username: String,
|
2022-08-09 14:18:27 -05:00
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
/// Server startup and session parameters that we're going to track
|
|
|
|
|
server_parameters: ServerParameters,
|
2022-09-01 22:06:55 -05:00
|
|
|
|
2022-08-09 14:18:27 -05:00
|
|
|
/// Used to notify clients about an impending shutdown
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown: Receiver<()>,
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
/// Whether prepared statements are enabled for this client
|
|
|
|
|
prepared_statements_enabled: bool,
|
|
|
|
|
|
|
|
|
|
/// Mapping of client named prepared statement to rewritten parse messages
|
|
|
|
|
prepared_statements: HashMap<String, (Arc<Parse>, u64)>,
|
|
|
|
|
|
|
|
|
|
/// Buffered extended protocol data
|
|
|
|
|
extended_protocol_data_buffer: VecDeque<ExtendedProtocolData>,
|
2022-02-03 13:35:40 -08:00
|
|
|
}
|
|
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
/// Client entrypoint.
|
|
|
|
|
pub async fn client_entrypoint(
|
2022-06-27 15:52:01 -07:00
|
|
|
mut stream: TcpStream,
|
|
|
|
|
client_server_map: ClientServerMap,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown: Receiver<()>,
|
2022-09-05 03:21:06 -05:00
|
|
|
drain: Sender<i32>,
|
2022-08-25 06:40:56 -07:00
|
|
|
admin_only: bool,
|
2022-11-16 22:15:47 -08:00
|
|
|
tls_certificate: Option<String>,
|
|
|
|
|
log_client_connections: bool,
|
2022-06-27 15:52:01 -07:00
|
|
|
) -> Result<(), Error> {
|
2022-06-27 16:45:41 -07:00
|
|
|
// Figure out if the client wants TLS or not.
|
2023-09-01 06:11:38 +09:00
|
|
|
let addr = match stream.peer_addr() {
|
|
|
|
|
Ok(addr) => addr,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Failed to get peer address: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
};
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
match get_startup::<TcpStream>(&mut stream).await {
|
2022-06-27 16:45:41 -07:00
|
|
|
// Client requested a TLS connection.
|
|
|
|
|
Ok((ClientConnectionType::Tls, _)) => {
|
|
|
|
|
// TLS settings are configured, will setup TLS now.
|
2023-10-10 09:18:21 -07:00
|
|
|
if tls_certificate.is_some() {
|
2022-06-27 16:45:41 -07:00
|
|
|
debug!("Accepting TLS request");
|
|
|
|
|
|
|
|
|
|
let mut yes = BytesMut::new();
|
|
|
|
|
yes.put_u8(b'S');
|
|
|
|
|
write_all(&mut stream, yes).await?;
|
|
|
|
|
|
|
|
|
|
// Negotiate TLS.
|
2022-08-25 06:40:56 -07:00
|
|
|
match startup_tls(stream, client_server_map, shutdown, admin_only).await {
|
2022-06-27 16:45:41 -07:00
|
|
|
Ok(mut client) => {
|
2022-11-16 22:15:47 -08:00
|
|
|
if log_client_connections {
|
|
|
|
|
info!("Client {:?} connected (TLS)", addr);
|
|
|
|
|
} else {
|
|
|
|
|
debug!("Client {:?} connected (TLS)", addr);
|
|
|
|
|
}
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-08-25 06:40:56 -07:00
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(1).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let result = client.handle().await;
|
|
|
|
|
|
|
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(-1).await;
|
2023-09-21 15:55:22 -07:00
|
|
|
}
|
2023-03-28 17:19:37 +02:00
|
|
|
|
2023-09-21 15:55:22 -07:00
|
|
|
if result.is_err() {
|
|
|
|
|
client.stats.disconnect();
|
2022-08-25 06:40:56 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
|
|
|
|
Err(err) => Err(err),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// TLS is not configured, we cannot offer it.
|
|
|
|
|
else {
|
|
|
|
|
// Rejecting client request for TLS.
|
|
|
|
|
let mut no = BytesMut::new();
|
|
|
|
|
no.put_u8(b'N');
|
|
|
|
|
write_all(&mut stream, no).await?;
|
|
|
|
|
|
|
|
|
|
// Attempting regular startup. Client can disconnect now
|
|
|
|
|
// if they choose.
|
|
|
|
|
match get_startup::<TcpStream>(&mut stream).await {
|
|
|
|
|
// Client accepted unencrypted connection.
|
|
|
|
|
Ok((ClientConnectionType::Startup, bytes)) => {
|
|
|
|
|
let (read, write) = split(stream);
|
|
|
|
|
|
|
|
|
|
// Continue with regular startup.
|
2022-08-08 19:01:24 -04:00
|
|
|
match Client::startup(
|
|
|
|
|
read,
|
|
|
|
|
write,
|
|
|
|
|
addr,
|
|
|
|
|
bytes,
|
|
|
|
|
client_server_map,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown,
|
|
|
|
|
admin_only,
|
2022-08-08 19:01:24 -04:00
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
2022-06-27 16:45:41 -07:00
|
|
|
Ok(mut client) => {
|
2022-11-16 22:15:47 -08:00
|
|
|
if log_client_connections {
|
|
|
|
|
info!("Client {:?} connected (plain)", addr);
|
|
|
|
|
} else {
|
|
|
|
|
debug!("Client {:?} connected (plain)", addr);
|
|
|
|
|
}
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-08-25 06:40:56 -07:00
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(1).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let result = client.handle().await;
|
|
|
|
|
|
|
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(-1).await;
|
2023-09-21 15:55:22 -07:00
|
|
|
}
|
2023-03-28 17:19:37 +02:00
|
|
|
|
2023-09-21 15:55:22 -07:00
|
|
|
if result.is_err() {
|
|
|
|
|
client.stats.disconnect();
|
2022-08-25 06:40:56 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
|
|
|
|
Err(err) => Err(err),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Client probably disconnected rejecting our plain text connection.
|
2022-11-17 09:24:39 -08:00
|
|
|
Ok((ClientConnectionType::Tls, _))
|
|
|
|
|
| Ok((ClientConnectionType::CancelQuery, _)) => Err(Error::ProtocolSyncError(
|
2023-04-10 14:51:01 -07:00
|
|
|
"Bad postgres client (plain)".into(),
|
2022-11-17 09:24:39 -08:00
|
|
|
)),
|
|
|
|
|
|
|
|
|
|
Err(err) => Err(err),
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
// Client wants to use plain connection without encryption.
|
2022-06-27 15:52:01 -07:00
|
|
|
Ok((ClientConnectionType::Startup, bytes)) => {
|
|
|
|
|
let (read, write) = split(stream);
|
2022-06-27 16:45:41 -07:00
|
|
|
|
|
|
|
|
// Continue with regular startup.
|
2022-08-08 19:01:24 -04:00
|
|
|
match Client::startup(
|
|
|
|
|
read,
|
|
|
|
|
write,
|
|
|
|
|
addr,
|
|
|
|
|
bytes,
|
|
|
|
|
client_server_map,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown,
|
|
|
|
|
admin_only,
|
2022-08-08 19:01:24 -04:00
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
2022-06-27 16:45:41 -07:00
|
|
|
Ok(mut client) => {
|
2022-11-16 22:15:47 -08:00
|
|
|
if log_client_connections {
|
|
|
|
|
info!("Client {:?} connected (plain)", addr);
|
|
|
|
|
} else {
|
|
|
|
|
debug!("Client {:?} connected (plain)", addr);
|
|
|
|
|
}
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-09-05 04:02:49 -04:00
|
|
|
if !client.is_admin() {
|
2022-08-25 06:40:56 -07:00
|
|
|
let _ = drain.send(1).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let result = client.handle().await;
|
|
|
|
|
|
|
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(-1).await;
|
2023-09-21 15:55:22 -07:00
|
|
|
}
|
2023-03-28 17:19:37 +02:00
|
|
|
|
2023-09-21 15:55:22 -07:00
|
|
|
if result.is_err() {
|
|
|
|
|
client.stats.disconnect();
|
2022-08-25 06:40:56 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
Err(err) => Err(err),
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
// Client wants to cancel a query.
|
2022-06-27 15:52:01 -07:00
|
|
|
Ok((ClientConnectionType::CancelQuery, bytes)) => {
|
2022-06-27 16:45:41 -07:00
|
|
|
let (read, write) = split(stream);
|
|
|
|
|
|
|
|
|
|
// Continue with cancel query request.
|
2022-08-25 06:40:56 -07:00
|
|
|
match Client::cancel(read, write, addr, bytes, client_server_map, shutdown).await {
|
2022-06-27 16:45:41 -07:00
|
|
|
Ok(mut client) => {
|
|
|
|
|
info!("Client {:?} issued a cancel query request", addr);
|
|
|
|
|
|
2022-09-05 04:02:49 -04:00
|
|
|
if !client.is_admin() {
|
2022-08-25 06:40:56 -07:00
|
|
|
let _ = drain.send(1).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let result = client.handle().await;
|
|
|
|
|
|
|
|
|
|
if !client.is_admin() {
|
|
|
|
|
let _ = drain.send(-1).await;
|
2023-09-21 15:55:22 -07:00
|
|
|
}
|
2022-08-25 06:40:56 -07:00
|
|
|
|
2023-09-21 15:55:22 -07:00
|
|
|
if result.is_err() {
|
|
|
|
|
client.stats.disconnect();
|
2023-03-28 17:19:37 +02:00
|
|
|
}
|
2023-09-21 15:55:22 -07:00
|
|
|
|
2022-08-25 06:40:56 -07:00
|
|
|
result
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Err(err) => Err(err),
|
|
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
// Something failed, probably the socket.
|
2022-06-27 15:52:01 -07:00
|
|
|
Err(err) => Err(err),
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
/// Handle the first message the client sends.
|
2022-06-27 15:52:01 -07:00
|
|
|
async fn get_startup<S>(stream: &mut S) -> Result<(ClientConnectionType, BytesMut), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncRead + std::marker::Unpin + tokio::io::AsyncWrite,
|
|
|
|
|
{
|
|
|
|
|
// Get startup message length.
|
|
|
|
|
let len = match stream.read_i32().await {
|
|
|
|
|
Ok(len) => len,
|
|
|
|
|
Err(_) => return Err(Error::ClientBadStartup),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Get the rest of the message.
|
|
|
|
|
let mut startup = vec![0u8; len as usize - 4];
|
|
|
|
|
match stream.read_exact(&mut startup).await {
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(_) => return Err(Error::ClientBadStartup),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut bytes = BytesMut::from(&startup[..]);
|
|
|
|
|
let code = bytes.get_i32();
|
|
|
|
|
|
|
|
|
|
match code {
|
|
|
|
|
// Client is requesting SSL (TLS).
|
|
|
|
|
SSL_REQUEST_CODE => Ok((ClientConnectionType::Tls, bytes)),
|
|
|
|
|
|
|
|
|
|
// Client wants to use plain text, requesting regular startup.
|
|
|
|
|
PROTOCOL_VERSION_NUMBER => Ok((ClientConnectionType::Startup, bytes)),
|
|
|
|
|
|
|
|
|
|
// Client is requesting to cancel a running query (plain text connection).
|
|
|
|
|
CANCEL_REQUEST_CODE => Ok((ClientConnectionType::CancelQuery, bytes)),
|
2022-06-27 16:45:41 -07:00
|
|
|
|
|
|
|
|
// Something else, probably something is wrong and it's not our fault,
|
|
|
|
|
// e.g. badly implemented Postgres client.
|
2022-11-17 09:24:39 -08:00
|
|
|
_ => Err(Error::ProtocolSyncError(format!(
|
|
|
|
|
"Unexpected startup code: {}",
|
|
|
|
|
code
|
|
|
|
|
))),
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-04 09:28:52 -08:00
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
/// Handle TLS connection negotiation.
|
2022-06-27 15:52:01 -07:00
|
|
|
pub async fn startup_tls(
|
2022-06-27 16:45:41 -07:00
|
|
|
stream: TcpStream,
|
2022-06-27 15:52:01 -07:00
|
|
|
client_server_map: ClientServerMap,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown: Receiver<()>,
|
|
|
|
|
admin_only: bool,
|
2022-06-27 15:52:01 -07:00
|
|
|
) -> Result<Client<ReadHalf<TlsStream<TcpStream>>, WriteHalf<TlsStream<TcpStream>>>, Error> {
|
|
|
|
|
// Negotiate TLS.
|
2022-06-27 16:45:41 -07:00
|
|
|
let tls = Tls::new()?;
|
2023-08-25 13:33:39 -04:00
|
|
|
let addr = match stream.peer_addr() {
|
|
|
|
|
Ok(addr) => addr,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Failed to get peer address: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
};
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
let mut stream = match tls.acceptor.accept(stream).await {
|
|
|
|
|
Ok(stream) => stream,
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
// TLS negotiation failed.
|
2022-06-27 16:45:41 -07:00
|
|
|
Err(err) => {
|
|
|
|
|
error!("TLS negotiation failed: {:?}", err);
|
2022-06-27 17:01:40 -07:00
|
|
|
return Err(Error::TlsError);
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
};
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
// TLS negotiation successful.
|
2022-06-27 16:45:41 -07:00
|
|
|
// Continue with regular startup using encrypted connection.
|
2022-06-27 15:52:01 -07:00
|
|
|
match get_startup::<TlsStream<TcpStream>>(&mut stream).await {
|
2022-06-27 16:45:41 -07:00
|
|
|
// Got good startup message, proceeding like normal except we
|
|
|
|
|
// are encrypted now.
|
2022-06-27 15:52:01 -07:00
|
|
|
Ok((ClientConnectionType::Startup, bytes)) => {
|
|
|
|
|
let (read, write) = split(stream);
|
2022-06-27 16:45:41 -07:00
|
|
|
|
2022-08-08 19:01:24 -04:00
|
|
|
Client::startup(
|
|
|
|
|
read,
|
|
|
|
|
write,
|
|
|
|
|
addr,
|
|
|
|
|
bytes,
|
|
|
|
|
client_server_map,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown,
|
|
|
|
|
admin_only,
|
2022-08-08 19:01:24 -04:00
|
|
|
)
|
|
|
|
|
.await
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
2022-06-27 16:45:41 -07:00
|
|
|
|
|
|
|
|
// Bad Postgres client.
|
2023-04-10 14:51:01 -07:00
|
|
|
Ok((ClientConnectionType::Tls, _)) | Ok((ClientConnectionType::CancelQuery, _)) => {
|
|
|
|
|
Err(Error::ProtocolSyncError("Bad postgres client (tls)".into()))
|
|
|
|
|
}
|
2022-11-17 09:24:39 -08:00
|
|
|
|
|
|
|
|
Err(err) => Err(err),
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-04 09:28:52 -08:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
impl<S, T> Client<S, T>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncRead + std::marker::Unpin,
|
|
|
|
|
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-08-25 06:40:56 -07:00
|
|
|
pub fn is_admin(&self) -> bool {
|
|
|
|
|
self.admin
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
/// Handle Postgres client startup after TLS negotiation is complete
|
|
|
|
|
/// or over plain text.
|
|
|
|
|
pub async fn startup(
|
2022-06-27 15:52:01 -07:00
|
|
|
mut read: S,
|
|
|
|
|
mut write: T,
|
2022-06-27 16:45:41 -07:00
|
|
|
addr: std::net::SocketAddr,
|
2022-06-27 15:52:01 -07:00
|
|
|
bytes: BytesMut, // The rest of the startup message.
|
|
|
|
|
client_server_map: ClientServerMap,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown: Receiver<()>,
|
|
|
|
|
admin_only: bool,
|
2022-06-27 15:52:01 -07:00
|
|
|
) -> Result<Client<S, T>, Error> {
|
|
|
|
|
let parameters = parse_startup(bytes.clone())?;
|
2022-08-25 06:40:56 -07:00
|
|
|
|
2022-11-16 18:49:04 -08:00
|
|
|
// This parameter is mandatory by the protocol.
|
2022-08-25 06:40:56 -07:00
|
|
|
let username = match parameters.get("user") {
|
2022-07-27 21:47:55 -05:00
|
|
|
Some(user) => user,
|
2022-11-17 09:24:39 -08:00
|
|
|
None => {
|
|
|
|
|
return Err(Error::ClientError(
|
2023-04-10 14:51:01 -07:00
|
|
|
"Missing user parameter on client startup".into(),
|
2022-11-17 09:24:39 -08:00
|
|
|
))
|
|
|
|
|
}
|
2022-07-27 21:47:55 -05:00
|
|
|
};
|
|
|
|
|
|
2022-11-16 18:49:04 -08:00
|
|
|
let pool_name = match parameters.get("database") {
|
|
|
|
|
Some(db) => db,
|
|
|
|
|
None => username,
|
|
|
|
|
};
|
|
|
|
|
|
2022-09-01 22:06:55 -05:00
|
|
|
let application_name = match parameters.get("application_name") {
|
|
|
|
|
Some(application_name) => application_name,
|
|
|
|
|
None => "pgcat",
|
|
|
|
|
};
|
|
|
|
|
|
2023-10-10 09:18:21 -07:00
|
|
|
let client_identifier = ClientIdentifier::new(application_name, username, pool_name);
|
2023-04-10 14:51:01 -07:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
let admin = ["pgcat", "pgbouncer"]
|
|
|
|
|
.iter()
|
2022-11-10 02:04:31 +08:00
|
|
|
.filter(|db| *db == pool_name)
|
2022-07-27 21:47:55 -05:00
|
|
|
.count()
|
|
|
|
|
== 1;
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-08-25 06:40:56 -07:00
|
|
|
// Kick any client that's not admin while we're in admin-only mode.
|
|
|
|
|
if !admin && admin_only {
|
|
|
|
|
debug!(
|
|
|
|
|
"Rejecting non-admin connection to {} when in admin only mode",
|
|
|
|
|
pool_name
|
|
|
|
|
);
|
|
|
|
|
error_response_terminal(
|
|
|
|
|
&mut write,
|
2022-11-10 02:04:31 +08:00
|
|
|
"terminating connection due to administrator command",
|
2022-08-25 06:40:56 -07:00
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
return Err(Error::ShuttingDown);
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
// Generate random backend ID and secret key
|
|
|
|
|
let process_id: i32 = rand::random();
|
|
|
|
|
let secret_key: i32 = rand::random();
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
// Perform MD5 authentication.
|
|
|
|
|
// TODO: Add SASL support.
|
|
|
|
|
let salt = md5_challenge(&mut write).await?;
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
let code = match read.read_u8().await {
|
|
|
|
|
Ok(p) => p,
|
2023-04-10 14:51:01 -07:00
|
|
|
Err(_) => {
|
|
|
|
|
return Err(Error::ClientSocketError(
|
|
|
|
|
"password code".into(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
))
|
|
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
};
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
// PasswordMessage
|
|
|
|
|
if code as char != 'p' {
|
2022-11-17 09:24:39 -08:00
|
|
|
return Err(Error::ProtocolSyncError(format!(
|
|
|
|
|
"Expected p, got {}",
|
|
|
|
|
code as char
|
|
|
|
|
)));
|
2022-06-27 15:52:01 -07:00
|
|
|
}
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
let len = match read.read_i32().await {
|
|
|
|
|
Ok(len) => len,
|
2023-04-10 14:51:01 -07:00
|
|
|
Err(_) => {
|
|
|
|
|
return Err(Error::ClientSocketError(
|
|
|
|
|
"password message length".into(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
))
|
|
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
};
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
2022-06-20 06:15:54 -07:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
match read.read_exact(&mut password_response).await {
|
|
|
|
|
Ok(_) => (),
|
2023-04-10 14:51:01 -07:00
|
|
|
Err(_) => {
|
|
|
|
|
return Err(Error::ClientSocketError(
|
|
|
|
|
"password message".into(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
))
|
|
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
};
|
2022-02-04 09:28:52 -08:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
let mut prepared_statements_enabled = false;
|
|
|
|
|
|
2022-08-25 06:40:56 -07:00
|
|
|
// Authenticate admin user.
|
2023-08-10 11:18:46 -04:00
|
|
|
let (transaction_mode, mut server_parameters) = if admin {
|
2022-11-17 09:22:12 -08:00
|
|
|
let config = get_config();
|
2023-04-10 14:51:01 -07:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
// Compare server and client hashes.
|
2022-08-25 06:40:56 -07:00
|
|
|
let password_hash = md5_hash_password(
|
|
|
|
|
&config.general.admin_username,
|
|
|
|
|
&config.general.admin_password,
|
|
|
|
|
&salt,
|
|
|
|
|
);
|
|
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
if password_hash != password_response {
|
2023-04-10 14:51:01 -07:00
|
|
|
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
|
|
|
|
|
|
|
|
|
warn!("{}", error);
|
2022-08-25 06:40:56 -07:00
|
|
|
wrong_password(&mut write, username).await?;
|
|
|
|
|
|
2023-04-10 14:51:01 -07:00
|
|
|
return Err(error);
|
2022-07-27 21:47:55 -05:00
|
|
|
}
|
2022-08-09 14:18:27 -05:00
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
(false, generate_server_parameters_for_admin())
|
2022-08-25 06:40:56 -07:00
|
|
|
}
|
|
|
|
|
// Authenticate normal user.
|
|
|
|
|
else {
|
2023-10-20 02:49:05 -04:00
|
|
|
let pool = match get_pool(pool_name, username) {
|
2022-07-27 21:47:55 -05:00
|
|
|
Some(pool) => pool,
|
|
|
|
|
None => {
|
|
|
|
|
error_response(
|
|
|
|
|
&mut write,
|
|
|
|
|
&format!(
|
2023-09-11 16:39:31 -07:00
|
|
|
"No pool configured for database: {:?}, user: {:?}",
|
2022-08-25 06:40:56 -07:00
|
|
|
pool_name, username
|
2022-07-27 21:47:55 -05:00
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
2022-08-25 06:40:56 -07:00
|
|
|
|
2023-04-10 14:51:01 -07:00
|
|
|
return Err(Error::ClientGeneralError(
|
|
|
|
|
"Invalid pool name".into(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
));
|
2022-07-27 21:47:55 -05:00
|
|
|
}
|
|
|
|
|
};
|
2022-08-25 06:40:56 -07:00
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
|
|
|
|
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
|
|
|
|
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
|
|
|
|
let password_hash = if let Some(password) = &pool.settings.user.password {
|
|
|
|
|
Some(md5_hash_password(username, password, &salt))
|
|
|
|
|
} else {
|
|
|
|
|
if !get_config().is_auth_query_configured() {
|
2023-04-30 09:41:46 -07:00
|
|
|
wrong_password(&mut write, username).await?;
|
2023-04-10 14:51:01 -07:00
|
|
|
return Err(Error::ClientAuthImpossible(username.into()));
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
let mut hash = (*pool.auth_hash.read()).clone();
|
2022-08-25 06:40:56 -07:00
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
if hash.is_none() {
|
2023-04-10 14:51:01 -07:00
|
|
|
warn!(
|
|
|
|
|
"Query auth configured \
|
|
|
|
|
but no hash password found \
|
|
|
|
|
for pool {}. Will try to refetch it.",
|
|
|
|
|
pool_name
|
|
|
|
|
);
|
|
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
match refetch_auth_hash(&pool).await {
|
|
|
|
|
Ok(fetched_hash) => {
|
2023-04-10 14:51:01 -07:00
|
|
|
warn!("Password for {}, obtained. Updating.", client_identifier);
|
|
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
{
|
|
|
|
|
let mut pool_auth_hash = pool.auth_hash.write();
|
|
|
|
|
*pool_auth_hash = Some(fetched_hash.clone());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hash = Some(fetched_hash);
|
|
|
|
|
}
|
2023-04-10 14:51:01 -07:00
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
Err(err) => {
|
2023-04-30 09:41:46 -07:00
|
|
|
wrong_password(&mut write, username).await?;
|
|
|
|
|
|
2023-04-10 14:51:01 -07:00
|
|
|
return Err(Error::ClientAuthPassthroughError(
|
|
|
|
|
err.to_string(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
));
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Once we have the resulting hash, we compare with what the client gave us.
|
|
|
|
|
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
|
|
|
|
// to see if the password has changed since the pool was created.
|
|
|
|
|
//
|
|
|
|
|
// @TODO: we could end up fetching again the same password twice (see above).
|
|
|
|
|
if password_hash.unwrap() != password_response {
|
2023-04-10 14:51:01 -07:00
|
|
|
warn!(
|
|
|
|
|
"Invalid password {}, will try to refetch it.",
|
|
|
|
|
client_identifier
|
|
|
|
|
);
|
|
|
|
|
|
2023-04-30 09:41:46 -07:00
|
|
|
let fetched_hash = match refetch_auth_hash(&pool).await {
|
|
|
|
|
Ok(fetched_hash) => fetched_hash,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
wrong_password(&mut write, username).await?;
|
|
|
|
|
|
|
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
|
|
|
|
|
|
|
|
|
// Ok password changed in server an auth is possible.
|
|
|
|
|
if new_password_hash == password_response {
|
2023-04-10 14:51:01 -07:00
|
|
|
warn!(
|
|
|
|
|
"Password for {}, changed in server. Updating.",
|
|
|
|
|
client_identifier
|
|
|
|
|
);
|
|
|
|
|
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
{
|
|
|
|
|
let mut pool_auth_hash = pool.auth_hash.write();
|
|
|
|
|
*pool_auth_hash = Some(fetched_hash);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
wrong_password(&mut write, username).await?;
|
2023-04-10 14:51:01 -07:00
|
|
|
return Err(Error::ClientGeneralError(
|
|
|
|
|
"Invalid password".into(),
|
|
|
|
|
client_identifier,
|
|
|
|
|
));
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
}
|
2022-07-27 21:47:55 -05:00
|
|
|
}
|
2022-08-25 06:40:56 -07:00
|
|
|
|
|
|
|
|
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
2023-10-25 18:11:57 -04:00
|
|
|
prepared_statements_enabled =
|
|
|
|
|
transaction_mode && pool.prepared_statement_cache.is_some();
|
2022-08-25 06:40:56 -07:00
|
|
|
|
2023-01-28 15:36:35 -08:00
|
|
|
// If the pool hasn't been validated yet,
|
|
|
|
|
// connect to the servers and figure out what's what.
|
|
|
|
|
if !pool.validated() {
|
|
|
|
|
match pool.validate().await {
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error_response(
|
|
|
|
|
&mut write,
|
|
|
|
|
&format!(
|
|
|
|
|
"Pool down for database: {:?}, user: {:?}",
|
|
|
|
|
pool_name, username
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
return Err(Error::ClientError(format!("Pool down: {:?}", err)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
(transaction_mode, pool.server_parameters())
|
2022-07-31 21:52:23 -05:00
|
|
|
};
|
2022-06-27 15:52:01 -07:00
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
// Update the parameters to merge what the application sent and what's originally on the server
|
|
|
|
|
server_parameters.set_from_hashmap(¶meters, false);
|
|
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
debug!("Password authentication successful");
|
|
|
|
|
|
|
|
|
|
auth_ok(&mut write).await?;
|
2023-08-10 11:18:46 -04:00
|
|
|
write_all(&mut write, (&server_parameters).into()).await?;
|
2022-06-27 15:52:01 -07:00
|
|
|
backend_key_data(&mut write, process_id, secret_key).await?;
|
2023-10-25 18:11:57 -04:00
|
|
|
send_ready_for_query(&mut write).await?;
|
2022-06-27 15:52:01 -07:00
|
|
|
|
|
|
|
|
trace!("Startup OK");
|
2023-03-28 17:19:37 +02:00
|
|
|
let stats = Arc::new(ClientStats::new(
|
|
|
|
|
process_id,
|
|
|
|
|
application_name,
|
|
|
|
|
username,
|
|
|
|
|
pool_name,
|
|
|
|
|
tokio::time::Instant::now(),
|
|
|
|
|
));
|
2022-06-27 15:52:01 -07:00
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
Ok(Client {
|
2022-06-27 15:52:01 -07:00
|
|
|
read: BufReader::new(read),
|
2022-11-10 02:04:31 +08:00
|
|
|
write,
|
2022-06-27 15:52:01 -07:00
|
|
|
buffer: BytesMut::with_capacity(8196),
|
2023-10-25 18:11:57 -04:00
|
|
|
response_message_queue_buffer: BytesMut::with_capacity(8196),
|
|
|
|
|
addr,
|
2022-06-27 15:52:01 -07:00
|
|
|
cancel_mode: false,
|
2022-08-25 06:40:56 -07:00
|
|
|
transaction_mode,
|
|
|
|
|
process_id,
|
|
|
|
|
secret_key,
|
|
|
|
|
client_server_map,
|
2022-07-27 21:47:55 -05:00
|
|
|
parameters: parameters.clone(),
|
2022-11-10 02:04:31 +08:00
|
|
|
stats,
|
|
|
|
|
admin,
|
2022-06-27 15:52:01 -07:00
|
|
|
last_address_id: None,
|
2023-03-28 17:19:37 +02:00
|
|
|
last_server_stats: None,
|
2023-10-25 18:11:57 -04:00
|
|
|
connected_to_server: false,
|
2022-08-25 06:40:56 -07:00
|
|
|
pool_name: pool_name.clone(),
|
|
|
|
|
username: username.clone(),
|
2023-08-10 11:18:46 -04:00
|
|
|
server_parameters,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown,
|
2023-10-25 18:11:57 -04:00
|
|
|
prepared_statements_enabled,
|
2023-06-16 12:57:44 -07:00
|
|
|
prepared_statements: HashMap::new(),
|
2023-10-25 18:11:57 -04:00
|
|
|
extended_protocol_data_buffer: VecDeque::new(),
|
2022-11-10 02:04:31 +08:00
|
|
|
})
|
2022-02-03 13:35:40 -08:00
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
|
2022-06-27 16:45:41 -07:00
|
|
|
/// Handle cancel request.
|
|
|
|
|
pub async fn cancel(
|
|
|
|
|
read: S,
|
|
|
|
|
write: T,
|
|
|
|
|
addr: std::net::SocketAddr,
|
|
|
|
|
mut bytes: BytesMut, // The rest of the startup message.
|
|
|
|
|
client_server_map: ClientServerMap,
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown: Receiver<()>,
|
2022-06-27 16:45:41 -07:00
|
|
|
) -> Result<Client<S, T>, Error> {
|
|
|
|
|
let process_id = bytes.get_i32();
|
|
|
|
|
let secret_key = bytes.get_i32();
|
2022-11-10 02:04:31 +08:00
|
|
|
Ok(Client {
|
2022-06-27 16:45:41 -07:00
|
|
|
read: BufReader::new(read),
|
2022-11-10 02:04:31 +08:00
|
|
|
write,
|
2022-06-27 16:45:41 -07:00
|
|
|
buffer: BytesMut::with_capacity(8196),
|
2023-10-25 18:11:57 -04:00
|
|
|
response_message_queue_buffer: BytesMut::with_capacity(8196),
|
|
|
|
|
addr,
|
2022-06-27 16:45:41 -07:00
|
|
|
cancel_mode: true,
|
2022-07-27 21:47:55 -05:00
|
|
|
transaction_mode: false,
|
2022-08-25 06:40:56 -07:00
|
|
|
process_id,
|
|
|
|
|
secret_key,
|
|
|
|
|
client_server_map,
|
2022-06-27 16:45:41 -07:00
|
|
|
parameters: HashMap::new(),
|
2023-03-28 17:19:37 +02:00
|
|
|
stats: Arc::new(ClientStats::default()),
|
2022-06-27 16:45:41 -07:00
|
|
|
admin: false,
|
|
|
|
|
last_address_id: None,
|
2023-03-28 17:19:37 +02:00
|
|
|
last_server_stats: None,
|
2023-10-25 18:11:57 -04:00
|
|
|
connected_to_server: false,
|
2022-08-25 06:40:56 -07:00
|
|
|
pool_name: String::from("undefined"),
|
|
|
|
|
username: String::from("undefined"),
|
2023-08-10 11:18:46 -04:00
|
|
|
server_parameters: ServerParameters::new(),
|
2022-08-25 06:40:56 -07:00
|
|
|
shutdown,
|
2023-10-25 18:11:57 -04:00
|
|
|
prepared_statements_enabled: false,
|
2023-06-16 12:57:44 -07:00
|
|
|
prepared_statements: HashMap::new(),
|
2023-10-25 18:11:57 -04:00
|
|
|
extended_protocol_data_buffer: VecDeque::new(),
|
2022-11-10 02:04:31 +08:00
|
|
|
})
|
2022-06-27 16:45:41 -07:00
|
|
|
}
|
2022-06-27 15:52:01 -07:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Handle a connected and authenticated client.
|
2022-06-24 14:52:38 -07:00
|
|
|
pub async fn handle(&mut self) -> Result<(), Error> {
|
2022-02-15 22:45:45 -08:00
|
|
|
// The client wants to cancel a query it has issued previously.
|
2022-02-04 09:28:52 -08:00
|
|
|
if self.cancel_mode {
|
2022-02-24 08:44:41 -08:00
|
|
|
trace!("Sending CancelRequest");
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2022-02-05 10:02:13 -08:00
|
|
|
let (process_id, secret_key, address, port) = {
|
2022-02-24 08:44:41 -08:00
|
|
|
let guard = self.client_server_map.lock();
|
2022-02-15 22:45:45 -08:00
|
|
|
|
2022-02-04 16:01:35 -08:00
|
|
|
match guard.get(&(self.process_id, self.secret_key)) {
|
2022-02-06 10:48:14 -08:00
|
|
|
// Drop the mutex as soon as possible.
|
2022-02-15 22:45:45 -08:00
|
|
|
// We found the server the client is using for its query
|
|
|
|
|
// that it wants to cancel.
|
2022-11-10 02:04:31 +08:00
|
|
|
Some((process_id, secret_key, address, port)) => {
|
|
|
|
|
(*process_id, *secret_key, address.clone(), *port)
|
|
|
|
|
}
|
2022-02-15 22:45:45 -08:00
|
|
|
|
|
|
|
|
// The client doesn't know / got the wrong server,
|
|
|
|
|
// we're closing the connection for security reasons.
|
2022-02-04 16:01:35 -08:00
|
|
|
None => return Ok(()),
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Opens a new separate connection to the server, sends the backend_id
|
|
|
|
|
// and secret_key and then closes it for security reasons. No other interactions
|
|
|
|
|
// take place.
|
2022-11-10 02:04:31 +08:00
|
|
|
return Server::cancel(&address, port, process_id, secret_key).await;
|
2022-02-04 09:28:52 -08:00
|
|
|
}
|
|
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
// The query router determines where the query is going to go,
|
|
|
|
|
// e.g. primary, replica, which shard.
|
2022-08-09 14:18:27 -05:00
|
|
|
let mut query_router = QueryRouter::new();
|
2023-03-28 17:19:37 +02:00
|
|
|
|
|
|
|
|
self.stats.register(self.stats.clone());
|
2022-02-19 13:57:35 -08:00
|
|
|
|
2023-05-03 09:13:05 -07:00
|
|
|
// Result returned by one of the plugins.
|
|
|
|
|
let mut plugin_output = None;
|
|
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
let client_identifier = ClientIdentifier::new(
|
2023-10-10 09:18:21 -07:00
|
|
|
self.server_parameters.get_application_name(),
|
2023-08-10 11:18:46 -04:00
|
|
|
&self.username,
|
|
|
|
|
&self.pool_name,
|
|
|
|
|
);
|
2023-08-08 16:10:03 -04:00
|
|
|
|
2023-10-20 02:49:05 -04:00
|
|
|
// Get a pool instance referenced by the most up-to-date
|
|
|
|
|
// pointer. This ensures we always read the latest config
|
|
|
|
|
// when starting a query.
|
|
|
|
|
let mut pool = if self.admin {
|
|
|
|
|
// Admin clients do not use pools.
|
|
|
|
|
ConnectionPool::default()
|
|
|
|
|
} else {
|
|
|
|
|
self.get_pool().await?
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
query_router.update_pool_settings(&pool.settings);
|
|
|
|
|
|
2022-02-16 22:52:11 -08:00
|
|
|
// Our custom protocol loop.
|
|
|
|
|
// We expect the client to either start a transaction with regular queries
|
2022-03-10 01:33:29 -08:00
|
|
|
// or issue commands for our sharding and server selection protocol.
|
2022-02-03 13:54:07 -08:00
|
|
|
loop {
|
2022-06-24 14:52:38 -07:00
|
|
|
trace!(
|
|
|
|
|
"Client idle, waiting for message, transaction mode: {}",
|
|
|
|
|
self.transaction_mode
|
|
|
|
|
);
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2022-02-06 10:48:14 -08:00
|
|
|
// Read a complete message from the client, which normally would be
|
|
|
|
|
// either a `Q` (query) or `P` (prepare, extended protocol).
|
|
|
|
|
// We can parse it here before grabbing a server from the pool,
|
2022-03-10 01:33:29 -08:00
|
|
|
// in case the client is sending some custom protocol messages, e.g.
|
2022-02-09 06:51:31 -08:00
|
|
|
// SET SHARDING KEY TO 'bigint';
|
2022-08-08 19:01:24 -04:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
let message = tokio::select! {
|
2022-08-25 06:40:56 -07:00
|
|
|
_ = self.shutdown.recv() => {
|
|
|
|
|
if !self.admin {
|
|
|
|
|
error_response_terminal(
|
|
|
|
|
&mut self.write,
|
2022-11-10 02:04:31 +08:00
|
|
|
"terminating connection due to administrator command"
|
2022-08-25 06:40:56 -07:00
|
|
|
).await?;
|
2023-03-28 17:19:37 +02:00
|
|
|
|
2023-04-10 14:51:01 -07:00
|
|
|
self.stats.disconnect();
|
|
|
|
|
return Ok(());
|
2022-08-25 06:40:56 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Admin clients ignore shutdown.
|
|
|
|
|
else {
|
|
|
|
|
read_message(&mut self.read).await?
|
|
|
|
|
}
|
2022-08-08 19:01:24 -04:00
|
|
|
},
|
|
|
|
|
message_result = read_message(&mut self.read) => message_result?
|
|
|
|
|
};
|
2022-02-06 10:48:14 -08:00
|
|
|
|
2023-08-16 13:08:48 -04:00
|
|
|
if message[0] as char == 'X' {
|
|
|
|
|
debug!("Client disconnecting");
|
|
|
|
|
|
|
|
|
|
self.stats.disconnect();
|
|
|
|
|
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
// Handle admin database queries.
|
|
|
|
|
if self.admin {
|
|
|
|
|
debug!("Handling admin command");
|
|
|
|
|
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Handle all custom protocol commands, if any.
|
|
|
|
|
if self
|
|
|
|
|
.handle_custom_protocol(&mut query_router, &message, &pool)
|
|
|
|
|
.await?
|
|
|
|
|
{
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
let mut initial_parsed_ast = None;
|
|
|
|
|
|
2022-09-01 17:02:39 -05:00
|
|
|
match message[0] as char {
|
2023-10-25 18:11:57 -04:00
|
|
|
// Query
|
2023-03-10 07:55:22 -08:00
|
|
|
'Q' => {
|
|
|
|
|
if query_router.query_parser_enabled() {
|
2023-08-08 16:10:03 -04:00
|
|
|
match query_router.parse(&message) {
|
|
|
|
|
Ok(ast) => {
|
|
|
|
|
let plugin_result = query_router.execute_plugins(&ast).await;
|
2023-05-03 09:13:05 -07:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
match plugin_result {
|
|
|
|
|
Ok(PluginOutput::Deny(error)) => {
|
|
|
|
|
error_response(&mut self.write, &error).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2023-05-03 09:13:05 -07:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
Ok(PluginOutput::Intercept(result)) => {
|
|
|
|
|
write_all(&mut self.write, result).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2023-05-03 09:13:05 -07:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
_ => (),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let _ = query_router.infer(&ast);
|
2023-05-03 09:13:05 -07:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
initial_parsed_ast = Some(ast);
|
|
|
|
|
}
|
|
|
|
|
Err(error) => {
|
|
|
|
|
warn!(
|
|
|
|
|
"Query parsing error: {} (client: {})",
|
|
|
|
|
error, client_identifier
|
|
|
|
|
);
|
|
|
|
|
}
|
2023-05-03 09:13:05 -07:00
|
|
|
}
|
2023-03-10 07:55:22 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Buffer extended protocol messages even if we do not have
|
|
|
|
|
// a server connection yet. Hopefully, when we get the S message
|
|
|
|
|
// we'll be able to allocate a connection. Also, clients do not expect
|
|
|
|
|
// the server to respond to these messages so even if we were not able to
|
|
|
|
|
// allocate a connection, we wouldn't be able to send back an error message
|
|
|
|
|
// to the client so we buffer them and defer the decision to error out or not
|
|
|
|
|
// to when we get the S message
|
|
|
|
|
// Parse
|
2023-03-10 07:55:22 -08:00
|
|
|
'P' => {
|
|
|
|
|
if query_router.query_parser_enabled() {
|
2023-08-08 16:10:03 -04:00
|
|
|
match query_router.parse(&message) {
|
|
|
|
|
Ok(ast) => {
|
|
|
|
|
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
|
|
|
plugin_output = Some(output);
|
|
|
|
|
}
|
2023-05-03 09:13:05 -07:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
let _ = query_router.infer(&ast);
|
|
|
|
|
}
|
|
|
|
|
Err(error) => {
|
|
|
|
|
warn!(
|
|
|
|
|
"Query parsing error: {} (client: {})",
|
|
|
|
|
error, client_identifier
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
};
|
2023-03-10 07:55:22 -08:00
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.buffer_parse(message, &pool)?;
|
|
|
|
|
|
2023-03-10 07:55:22 -08:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Bind
|
2023-03-10 07:55:22 -08:00
|
|
|
'B' => {
|
|
|
|
|
if query_router.query_parser_enabled() {
|
|
|
|
|
query_router.infer_shard_from_bind(&message);
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.buffer_bind(message).await?;
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Describe
|
|
|
|
|
'D' => {
|
|
|
|
|
self.buffer_describe(message).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
'E' => {
|
|
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_execute(message));
|
2023-03-10 07:55:22 -08:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-18 23:02:34 -07:00
|
|
|
// Close (F)
|
|
|
|
|
'C' => {
|
2023-10-25 18:11:57 -04:00
|
|
|
let close: Close = (&message).try_into()?;
|
2023-06-18 23:02:34 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_close(message, close));
|
|
|
|
|
continue;
|
2023-06-18 23:02:34 -07:00
|
|
|
}
|
|
|
|
|
|
2022-09-01 17:02:39 -05:00
|
|
|
_ => (),
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-03 09:13:05 -07:00
|
|
|
// Check on plugin results.
|
2023-10-10 09:18:21 -07:00
|
|
|
if let Some(PluginOutput::Deny(error)) = plugin_output {
|
2023-10-25 18:11:57 -04:00
|
|
|
self.reset_buffered_state();
|
2023-10-10 09:18:21 -07:00
|
|
|
error_response(&mut self.write, &error).await?;
|
|
|
|
|
plugin_output = None;
|
|
|
|
|
continue;
|
2023-05-03 09:13:05 -07:00
|
|
|
};
|
|
|
|
|
|
2023-01-28 15:36:35 -08:00
|
|
|
// Check if the pool is paused and wait until it's resumed.
|
2023-10-20 02:49:05 -04:00
|
|
|
pool.wait_paused().await;
|
2022-09-23 11:08:38 -05:00
|
|
|
|
2023-10-20 02:49:05 -04:00
|
|
|
// Refresh pool information, something might have changed.
|
|
|
|
|
pool = self.get_pool().await?;
|
|
|
|
|
query_router.update_pool_settings(&pool.settings);
|
2023-01-28 15:36:35 -08:00
|
|
|
|
2022-02-22 19:26:08 -08:00
|
|
|
debug!("Waiting for connection from pool");
|
2023-03-28 17:19:37 +02:00
|
|
|
if !self.admin {
|
|
|
|
|
self.stats.waiting();
|
|
|
|
|
}
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// Grab a server from the pool.
|
2022-03-04 17:04:27 -08:00
|
|
|
let connection = match pool
|
2023-03-28 17:19:37 +02:00
|
|
|
.get(query_router.shard(), query_router.role(), &self.stats)
|
2022-03-04 17:04:27 -08:00
|
|
|
.await
|
|
|
|
|
{
|
2022-02-22 19:26:08 -08:00
|
|
|
Ok(conn) => {
|
|
|
|
|
debug!("Got connection from pool");
|
|
|
|
|
conn
|
|
|
|
|
}
|
2022-02-09 21:19:14 -08:00
|
|
|
Err(err) => {
|
2022-09-01 17:02:39 -05:00
|
|
|
// Client is attempting to get results from the server,
|
|
|
|
|
// but we were unable to grab a connection from the pool
|
|
|
|
|
// We'll send back an error message and clean the extended
|
|
|
|
|
// protocol buffer
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.idle();
|
|
|
|
|
|
2022-09-01 17:02:39 -05:00
|
|
|
if message[0] as char == 'S' {
|
|
|
|
|
error!("Got Sync message but failed to get a connection from the pool");
|
2023-10-25 18:11:57 -04:00
|
|
|
self.reset_buffered_state();
|
2022-09-01 17:02:39 -05:00
|
|
|
}
|
2023-04-10 14:51:01 -07:00
|
|
|
|
Allow configuring routing decision when no shard is selected (#578)
The TL;DR for the change is that we allow QueryRouter to set the active shard to None. This signals to the Pool::get method that we have no shard selected. The get method follows a no_shard_specified_behavior config to know how to route the query.
Original PR description
Ruby-pg library makes a startup query to SET client_encoding to ... if Encoding.default_internal value is set (Code). This query is troublesome because we cannot possibly attach a routing comment to it. PgCat, by default, will route that query to the default shard.
Everything is fine until shard 0 has issues, Clients will all be attempting to send this query to shard0 which increases the connection latency significantly for all clients, even those not interested in shard0
This PR introduces no_shard_specified_behavior that defines the behavior in case we have routing-by-comment enabled but we get a query without a comment. The allowed behaviors are
random: Picks a shard at random
random_healthy: Picks a shard at random favoring shards with the least number of recent connection/checkout errors
shard_<number>: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
In order to achieve this, this PR introduces an error_count on the Address Object that tracks the number of errors since the last checkout and uses that metric to sort shards by error count before making a routing decision.
I didn't want to use address stats to avoid introducing a routing dependency on internal stats (We might do that in the future but I prefer to avoid this for the time being.
I also made changes to the test environment to replace Ruby's TOML reader library, It appears to be abandoned and does not support mixed arrays (which we use in the config toml), and it also does not play nicely with single-quoted regular expressions. I opted for using yj which is a CLI tool that can convert from toml to JSON and back. So I refactor the tests to use that library.
2023-09-11 13:47:28 -05:00
|
|
|
error_response(
|
|
|
|
|
&mut self.write,
|
2023-09-11 16:39:31 -07:00
|
|
|
format!("could not get connection from the pool - {}", err).as_str(),
|
Allow configuring routing decision when no shard is selected (#578)
The TL;DR for the change is that we allow QueryRouter to set the active shard to None. This signals to the Pool::get method that we have no shard selected. The get method follows a no_shard_specified_behavior config to know how to route the query.
Original PR description
Ruby-pg library makes a startup query to SET client_encoding to ... if Encoding.default_internal value is set (Code). This query is troublesome because we cannot possibly attach a routing comment to it. PgCat, by default, will route that query to the default shard.
Everything is fine until shard 0 has issues, Clients will all be attempting to send this query to shard0 which increases the connection latency significantly for all clients, even those not interested in shard0
This PR introduces no_shard_specified_behavior that defines the behavior in case we have routing-by-comment enabled but we get a query without a comment. The allowed behaviors are
random: Picks a shard at random
random_healthy: Picks a shard at random favoring shards with the least number of recent connection/checkout errors
shard_<number>: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
In order to achieve this, this PR introduces an error_count on the Address Object that tracks the number of errors since the last checkout and uses that metric to sort shards by error count before making a routing decision.
I didn't want to use address stats to avoid introducing a routing dependency on internal stats (We might do that in the future but I prefer to avoid this for the time being.
I also made changes to the test environment to replace Ruby's TOML reader library, It appears to be abandoned and does not support mixed arrays (which we use in the config toml), and it also does not play nicely with single-quoted regular expressions. I opted for using yj which is a CLI tool that can convert from toml to JSON and back. So I refactor the tests to use that library.
2023-09-11 13:47:28 -05:00
|
|
|
)
|
|
|
|
|
.await?;
|
2022-09-07 10:24:07 -05:00
|
|
|
|
2023-04-10 14:51:01 -07:00
|
|
|
error!(
|
|
|
|
|
"Could not get connection from pool: \
|
|
|
|
|
{{ \
|
|
|
|
|
pool_name: {:?}, \
|
|
|
|
|
username: {:?}, \
|
|
|
|
|
shard: {:?}, \
|
|
|
|
|
role: \"{:?}\", \
|
|
|
|
|
error: \"{:?}\" \
|
|
|
|
|
}}",
|
|
|
|
|
self.pool_name,
|
|
|
|
|
self.username,
|
|
|
|
|
query_router.shard(),
|
|
|
|
|
query_router.role(),
|
|
|
|
|
err
|
|
|
|
|
);
|
|
|
|
|
|
2022-02-16 22:52:11 -08:00
|
|
|
continue;
|
2022-02-09 21:19:14 -08:00
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
let mut reference = connection.0;
|
2022-03-04 17:04:27 -08:00
|
|
|
let address = connection.1;
|
2022-02-15 22:45:45 -08:00
|
|
|
let server = &mut *reference;
|
2022-02-03 16:25:05 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// Server is assigned to the client in case the client wants to
|
|
|
|
|
// cancel a query later.
|
2022-02-04 16:01:35 -08:00
|
|
|
server.claim(self.process_id, self.secret_key);
|
2022-08-22 11:52:34 -07:00
|
|
|
self.connected_to_server = true;
|
2022-02-03 17:06:19 -08:00
|
|
|
|
2022-09-14 10:20:41 -05:00
|
|
|
// Update statistics
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.active();
|
2022-03-10 01:33:29 -08:00
|
|
|
|
2022-03-04 17:04:27 -08:00
|
|
|
self.last_address_id = Some(address.id);
|
2023-03-28 17:19:37 +02:00
|
|
|
self.last_server_stats = Some(server.stats());
|
2022-02-20 12:40:09 -08:00
|
|
|
|
2022-02-22 19:26:08 -08:00
|
|
|
debug!(
|
2022-06-27 16:45:41 -07:00
|
|
|
"Client {:?} talking to server {:?}",
|
|
|
|
|
self.addr,
|
2022-02-22 19:26:08 -08:00
|
|
|
server.address()
|
|
|
|
|
);
|
|
|
|
|
|
2023-08-10 11:18:46 -04:00
|
|
|
server.sync_parameters(&self.server_parameters).await?;
|
2022-06-03 00:15:50 -07:00
|
|
|
|
2022-10-13 22:33:12 -04:00
|
|
|
let mut initial_message = Some(message);
|
|
|
|
|
|
2023-03-24 11:20:30 -04:00
|
|
|
let idle_client_timeout_duration = match get_idle_client_in_transaction_timeout() {
|
|
|
|
|
0 => tokio::time::Duration::MAX,
|
|
|
|
|
timeout => tokio::time::Duration::from_millis(timeout),
|
|
|
|
|
};
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Transaction loop. Multiple queries can be issued by the client here.
|
|
|
|
|
// The connection belongs to the client until the transaction is over,
|
|
|
|
|
// or until the client disconnects if we are in session mode.
|
2022-03-10 01:33:29 -08:00
|
|
|
//
|
|
|
|
|
// If the client is in session mode, no more custom protocol
|
|
|
|
|
// commands will be accepted.
|
2022-02-03 16:25:05 -08:00
|
|
|
loop {
|
2023-10-25 18:11:57 -04:00
|
|
|
let message = match initial_message {
|
2022-10-13 22:33:12 -04:00
|
|
|
None => {
|
|
|
|
|
trace!("Waiting for message inside transaction or in session mode");
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2023-08-08 16:10:03 -04:00
|
|
|
// This is not an initial message so discard the initial_parsed_ast
|
|
|
|
|
initial_parsed_ast.take();
|
|
|
|
|
|
2023-03-24 11:20:30 -04:00
|
|
|
match tokio::time::timeout(
|
|
|
|
|
idle_client_timeout_duration,
|
|
|
|
|
read_message(&mut self.read),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(Ok(message)) => message,
|
|
|
|
|
Ok(Err(err)) => {
|
2022-10-13 22:33:12 -04:00
|
|
|
// Client disconnected inside a transaction.
|
|
|
|
|
// Clean up the server and re-use it.
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.disconnect();
|
2022-10-13 22:33:12 -04:00
|
|
|
server.checkin_cleanup().await?;
|
2022-02-03 17:06:19 -08:00
|
|
|
|
2022-10-13 22:33:12 -04:00
|
|
|
return Err(err);
|
|
|
|
|
}
|
2023-03-24 11:20:30 -04:00
|
|
|
Err(_) => {
|
|
|
|
|
// Client idle in transaction timeout
|
|
|
|
|
error_response(&mut self.write, "idle transaction timeout").await?;
|
2023-04-10 14:51:01 -07:00
|
|
|
error!(
|
|
|
|
|
"Client idle in transaction timeout: \
|
|
|
|
|
{{ \
|
|
|
|
|
pool_name: {}, \
|
|
|
|
|
username: {}, \
|
Allow configuring routing decision when no shard is selected (#578)
The TL;DR for the change is that we allow QueryRouter to set the active shard to None. This signals to the Pool::get method that we have no shard selected. The get method follows a no_shard_specified_behavior config to know how to route the query.
Original PR description
Ruby-pg library makes a startup query to SET client_encoding to ... if Encoding.default_internal value is set (Code). This query is troublesome because we cannot possibly attach a routing comment to it. PgCat, by default, will route that query to the default shard.
Everything is fine until shard 0 has issues, Clients will all be attempting to send this query to shard0 which increases the connection latency significantly for all clients, even those not interested in shard0
This PR introduces no_shard_specified_behavior that defines the behavior in case we have routing-by-comment enabled but we get a query without a comment. The allowed behaviors are
random: Picks a shard at random
random_healthy: Picks a shard at random favoring shards with the least number of recent connection/checkout errors
shard_<number>: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
In order to achieve this, this PR introduces an error_count on the Address Object that tracks the number of errors since the last checkout and uses that metric to sort shards by error count before making a routing decision.
I didn't want to use address stats to avoid introducing a routing dependency on internal stats (We might do that in the future but I prefer to avoid this for the time being.
I also made changes to the test environment to replace Ruby's TOML reader library, It appears to be abandoned and does not support mixed arrays (which we use in the config toml), and it also does not play nicely with single-quoted regular expressions. I opted for using yj which is a CLI tool that can convert from toml to JSON and back. So I refactor the tests to use that library.
2023-09-11 13:47:28 -05:00
|
|
|
shard: {:?}, \
|
2023-04-10 14:51:01 -07:00
|
|
|
role: \"{:?}\" \
|
|
|
|
|
}}",
|
|
|
|
|
self.pool_name,
|
|
|
|
|
self.username,
|
|
|
|
|
query_router.shard(),
|
|
|
|
|
query_router.role()
|
|
|
|
|
);
|
|
|
|
|
|
2023-03-24 11:20:30 -04:00
|
|
|
break;
|
|
|
|
|
}
|
2022-02-06 10:48:14 -08:00
|
|
|
}
|
2022-02-03 17:06:19 -08:00
|
|
|
}
|
2023-04-10 14:51:01 -07:00
|
|
|
|
2022-10-13 22:33:12 -04:00
|
|
|
Some(message) => {
|
|
|
|
|
initial_message = None;
|
|
|
|
|
message
|
|
|
|
|
}
|
2022-02-03 17:06:19 -08:00
|
|
|
};
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// The message will be forwarded to the server intact. We still would like to
|
|
|
|
|
// parse it below to figure out what to do with it.
|
|
|
|
|
|
2022-10-13 22:33:12 -04:00
|
|
|
// Safe to unwrap because we know this message has a certain length and has the code
|
|
|
|
|
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
2023-10-10 09:18:21 -07:00
|
|
|
let code = *message.first().unwrap() as char;
|
2022-02-03 16:25:05 -08:00
|
|
|
|
2023-11-28 21:13:30 -08:00
|
|
|
trace!("Client message: {}", code);
|
2022-02-22 19:26:08 -08:00
|
|
|
|
2022-02-03 16:25:05 -08:00
|
|
|
match code {
|
2022-10-13 22:33:12 -04:00
|
|
|
// Query
|
2022-02-03 16:25:05 -08:00
|
|
|
'Q' => {
|
2023-05-03 09:13:05 -07:00
|
|
|
if query_router.query_parser_enabled() {
|
2023-08-08 16:10:03 -04:00
|
|
|
// We don't want to parse again if we already parsed it as the initial message
|
|
|
|
|
let ast = match initial_parsed_ast {
|
|
|
|
|
Some(_) => Some(initial_parsed_ast.take().unwrap()),
|
|
|
|
|
None => match query_router.parse(&message) {
|
|
|
|
|
Ok(ast) => Some(ast),
|
|
|
|
|
Err(error) => {
|
|
|
|
|
warn!(
|
|
|
|
|
"Query parsing error: {} (client: {})",
|
|
|
|
|
error, client_identifier
|
|
|
|
|
);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if let Some(ast) = ast {
|
2023-05-03 09:13:05 -07:00
|
|
|
let plugin_result = query_router.execute_plugins(&ast).await;
|
|
|
|
|
|
|
|
|
|
match plugin_result {
|
|
|
|
|
Ok(PluginOutput::Deny(error)) => {
|
|
|
|
|
error_response(&mut self.write, &error).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(PluginOutput::Intercept(result)) => {
|
|
|
|
|
write_all(&mut self.write, result).await?;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ => (),
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-11-28 21:13:30 -08:00
|
|
|
|
2022-02-22 19:26:08 -08:00
|
|
|
debug!("Sending query to server");
|
|
|
|
|
|
2023-03-28 17:19:37 +02:00
|
|
|
self.send_and_receive_loop(
|
|
|
|
|
code,
|
|
|
|
|
Some(&message),
|
|
|
|
|
server,
|
|
|
|
|
&address,
|
|
|
|
|
&pool,
|
|
|
|
|
&self.stats.clone(),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
2022-02-04 08:26:50 -08:00
|
|
|
|
2022-02-14 10:00:55 -08:00
|
|
|
if !server.in_transaction() {
|
2022-02-15 22:45:45 -08:00
|
|
|
// Report transaction executed statistics.
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.transaction();
|
2023-08-10 11:18:46 -04:00
|
|
|
server
|
|
|
|
|
.stats()
|
2023-10-10 09:18:21 -07:00
|
|
|
.transaction(self.server_parameters.get_application_name());
|
2022-02-14 10:00:55 -08:00
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Release server back to the pool if we are in transaction mode.
|
|
|
|
|
// If we are in session mode, we keep the server until the client disconnects.
|
2023-07-21 01:06:01 -05:00
|
|
|
if self.transaction_mode && !server.in_copy_mode() {
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.idle();
|
|
|
|
|
|
2022-02-14 10:00:55 -08:00
|
|
|
break;
|
|
|
|
|
}
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Terminate
|
2022-02-03 16:25:05 -08:00
|
|
|
'X' => {
|
2022-09-01 22:06:55 -05:00
|
|
|
server.checkin_cleanup().await?;
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.disconnect();
|
2022-05-18 16:24:03 -07:00
|
|
|
self.release();
|
|
|
|
|
|
2022-02-03 16:25:05 -08:00
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Parse
|
|
|
|
|
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
2022-02-03 16:25:05 -08:00
|
|
|
'P' => {
|
2023-05-03 09:13:05 -07:00
|
|
|
if query_router.query_parser_enabled() {
|
2023-08-08 16:10:03 -04:00
|
|
|
if let Ok(ast) = query_router.parse(&message) {
|
2023-05-03 09:13:05 -07:00
|
|
|
if let Ok(output) = query_router.execute_plugins(&ast).await {
|
|
|
|
|
plugin_output = Some(output);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.buffer_parse(message, &pool)?;
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Bind
|
|
|
|
|
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
2022-02-03 16:25:05 -08:00
|
|
|
'B' => {
|
2023-10-25 18:11:57 -04:00
|
|
|
self.buffer_bind(message).await?;
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-06 10:48:14 -08:00
|
|
|
// Describe
|
2022-02-15 22:45:45 -08:00
|
|
|
// Command a client can issue to describe a previously prepared named statement.
|
2022-02-03 16:25:05 -08:00
|
|
|
'D' => {
|
2023-10-25 18:11:57 -04:00
|
|
|
self.buffer_describe(message).await?;
|
|
|
|
|
}
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Execute
|
|
|
|
|
// Execute a prepared statement prepared in `P` and bound in `B`.
|
|
|
|
|
'E' => {
|
|
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_execute(message));
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Close
|
2023-04-18 09:57:17 -07:00
|
|
|
// Close the prepared statement.
|
|
|
|
|
'C' => {
|
2023-10-25 18:11:57 -04:00
|
|
|
let close: Close = (&message).try_into()?;
|
2023-06-18 23:02:34 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_close(message, close));
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Sync
|
|
|
|
|
// Frontend (client) is asking for the query result now.
|
2022-02-03 16:25:05 -08:00
|
|
|
'S' => {
|
2022-02-22 19:26:08 -08:00
|
|
|
debug!("Sending query to server");
|
|
|
|
|
|
2023-05-03 09:13:05 -07:00
|
|
|
match plugin_output {
|
|
|
|
|
Some(PluginOutput::Deny(error)) => {
|
|
|
|
|
error_response(&mut self.write, &error).await?;
|
|
|
|
|
plugin_output = None;
|
2023-10-25 18:11:57 -04:00
|
|
|
self.reset_buffered_state();
|
2023-05-03 09:13:05 -07:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Some(PluginOutput::Intercept(result)) => {
|
|
|
|
|
write_all(&mut self.write, result).await?;
|
|
|
|
|
plugin_output = None;
|
2023-10-25 18:11:57 -04:00
|
|
|
self.reset_buffered_state();
|
2023-05-03 09:13:05 -07:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ => (),
|
|
|
|
|
};
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// Prepared statements can arrive like this
|
|
|
|
|
// 1. Without named describe
|
|
|
|
|
// Client: Parse, with name, query and params
|
|
|
|
|
// Sync
|
|
|
|
|
// Server: ParseComplete
|
|
|
|
|
// ReadyForQuery
|
|
|
|
|
// 3. Without named describe
|
|
|
|
|
// Client: Parse, with name, query and params
|
|
|
|
|
// Describe, with no name
|
|
|
|
|
// Sync
|
|
|
|
|
// Server: ParseComplete
|
|
|
|
|
// ParameterDescription
|
|
|
|
|
// RowDescription
|
|
|
|
|
// ReadyForQuery
|
|
|
|
|
// 2. With named describe
|
|
|
|
|
// Client: Parse, with name, query and params
|
|
|
|
|
// Describe, with name
|
|
|
|
|
// Sync
|
|
|
|
|
// Server: ParseComplete
|
|
|
|
|
// ParameterDescription
|
|
|
|
|
// RowDescription
|
|
|
|
|
// ReadyForQuery
|
|
|
|
|
|
|
|
|
|
// Iterate over our extended protocol data that we've buffered
|
|
|
|
|
while let Some(protocol_data) =
|
|
|
|
|
self.extended_protocol_data_buffer.pop_front()
|
|
|
|
|
{
|
|
|
|
|
match protocol_data {
|
|
|
|
|
ExtendedProtocolData::Parse { data, metadata } => {
|
2023-11-28 21:13:30 -08:00
|
|
|
debug!("Have parse in extended buffer");
|
2023-10-25 18:11:57 -04:00
|
|
|
let (parse, hash) = match metadata {
|
|
|
|
|
Some(metadata) => metadata,
|
|
|
|
|
None => {
|
|
|
|
|
let first_char_in_name = *data.get(5).unwrap_or(&0);
|
|
|
|
|
if first_char_in_name != 0 {
|
|
|
|
|
// This is a named prepared statement while prepared statements are disabled
|
|
|
|
|
// Server connection state will need to be cleared at checkin
|
|
|
|
|
server.mark_dirty();
|
|
|
|
|
}
|
|
|
|
|
// Not a prepared statement
|
|
|
|
|
self.buffer.put(&data[..]);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// This is a prepared statement we already have on the checked out server
|
|
|
|
|
if server.has_prepared_statement(&parse.name) {
|
|
|
|
|
debug!(
|
|
|
|
|
"Prepared statement `{}` found in server cache",
|
|
|
|
|
parse.name
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// We don't want to send the parse message to the server
|
|
|
|
|
// Instead queue up a parse complete message to send to the client
|
|
|
|
|
self.response_message_queue_buffer.put(parse_complete());
|
|
|
|
|
} else {
|
|
|
|
|
debug!(
|
|
|
|
|
"Prepared statement `{}` not found in server cache",
|
|
|
|
|
parse.name
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// TODO: Consider adding the close logic that this function can send for eviction to the client buffer instead
|
|
|
|
|
// In this case we don't want to send the parse message to the server since the client is sending it
|
|
|
|
|
self.register_parse_to_server_cache(
|
|
|
|
|
false, &hash, &parse, &pool, server, &address,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
// Add parse message to buffer
|
|
|
|
|
self.buffer.put(&data[..]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ExtendedProtocolData::Bind { data, metadata } => {
|
|
|
|
|
// This is using a prepared statement
|
|
|
|
|
if let Some(client_given_name) = metadata {
|
|
|
|
|
self.ensure_prepared_statement_is_on_server(
|
|
|
|
|
client_given_name,
|
|
|
|
|
&pool,
|
|
|
|
|
server,
|
|
|
|
|
&address,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.buffer.put(&data[..]);
|
|
|
|
|
}
|
|
|
|
|
ExtendedProtocolData::Describe { data, metadata } => {
|
|
|
|
|
// This is using a prepared statement
|
|
|
|
|
if let Some(client_given_name) = metadata {
|
|
|
|
|
self.ensure_prepared_statement_is_on_server(
|
|
|
|
|
client_given_name,
|
|
|
|
|
&pool,
|
|
|
|
|
server,
|
|
|
|
|
&address,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.buffer.put(&data[..]);
|
|
|
|
|
}
|
|
|
|
|
ExtendedProtocolData::Execute { data } => {
|
|
|
|
|
self.buffer.put(&data[..])
|
|
|
|
|
}
|
|
|
|
|
ExtendedProtocolData::Close { data, close } => {
|
|
|
|
|
// We don't send the close message to the server if prepared statements are enabled
|
|
|
|
|
// and it's a close with a prepared statement name provided
|
|
|
|
|
if self.prepared_statements_enabled
|
|
|
|
|
&& close.is_prepared_statement()
|
|
|
|
|
&& !close.anonymous()
|
|
|
|
|
{
|
|
|
|
|
self.prepared_statements.remove(&close.name);
|
|
|
|
|
|
|
|
|
|
// Queue up a close complete message to send to the client
|
|
|
|
|
self.response_message_queue_buffer.put(close_complete());
|
|
|
|
|
} else {
|
|
|
|
|
self.buffer.put(&data[..]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add the sync message
|
2022-10-13 22:33:12 -04:00
|
|
|
self.buffer.put(&message[..]);
|
2022-02-09 21:19:14 -08:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
let mut should_send_to_server = true;
|
|
|
|
|
|
|
|
|
|
// If we have just a sync message left (maybe after omitting sending some messages to the server) no need to send it to the server
|
|
|
|
|
if *self.buffer.first().unwrap() == b'S' {
|
|
|
|
|
should_send_to_server = false;
|
|
|
|
|
// queue up a ready for query message to send to the client, respecting the transaction state of the server
|
|
|
|
|
self.response_message_queue_buffer
|
|
|
|
|
.put(ready_for_query(server.in_transaction()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send all queued messages to the client
|
|
|
|
|
// NOTE: it's possible we don't perfectly send things back in the same order as postgres would,
|
|
|
|
|
// however clients should be able to handle this
|
|
|
|
|
if !self.response_message_queue_buffer.is_empty() {
|
|
|
|
|
if let Err(err) = write_all_flush(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&self.response_message_queue_buffer,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
// We might be in some kind of error/in between protocol state
|
2023-12-04 19:09:41 -05:00
|
|
|
server.mark_bad(err.to_string().as_str());
|
2023-10-25 18:11:57 -04:00
|
|
|
return Err(err);
|
2022-09-07 22:37:17 -05:00
|
|
|
}
|
2023-10-25 18:11:57 -04:00
|
|
|
|
|
|
|
|
self.response_message_queue_buffer.clear();
|
2022-09-07 22:37:17 -05:00
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
if should_send_to_server {
|
|
|
|
|
self.send_and_receive_loop(
|
|
|
|
|
code,
|
|
|
|
|
None,
|
|
|
|
|
server,
|
|
|
|
|
&address,
|
|
|
|
|
&pool,
|
|
|
|
|
&self.stats.clone(),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
}
|
2022-02-15 22:45:45 -08:00
|
|
|
|
2022-02-03 16:25:05 -08:00
|
|
|
self.buffer.clear();
|
|
|
|
|
|
2022-02-14 10:00:55 -08:00
|
|
|
if !server.in_transaction() {
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.transaction();
|
2023-08-10 11:18:46 -04:00
|
|
|
server
|
|
|
|
|
.stats()
|
2023-10-10 09:18:21 -07:00
|
|
|
.transaction(self.server_parameters.get_application_name());
|
2022-02-14 10:00:55 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// Release server back to the pool if we are in transaction mode.
|
|
|
|
|
// If we are in session mode, we keep the server until the client disconnects.
|
2023-07-21 01:06:01 -05:00
|
|
|
if self.transaction_mode && !server.in_copy_mode() {
|
2022-02-14 10:00:55 -08:00
|
|
|
break;
|
|
|
|
|
}
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-04 08:06:45 -08:00
|
|
|
// CopyData
|
|
|
|
|
'd' => {
|
2023-01-17 20:39:55 -05:00
|
|
|
self.buffer.put(&message[..]);
|
|
|
|
|
|
|
|
|
|
// Want to limit buffer size
|
|
|
|
|
if self.buffer.len() > 8196 {
|
|
|
|
|
// Forward the data to the server,
|
|
|
|
|
self.send_server_message(server, &self.buffer, &address, &pool)
|
|
|
|
|
.await?;
|
|
|
|
|
self.buffer.clear();
|
|
|
|
|
}
|
2022-02-04 08:06:45 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// CopyDone or CopyFail
|
|
|
|
|
// Copy is done, successfully or not.
|
2022-02-04 08:06:45 -08:00
|
|
|
'c' | 'f' => {
|
2023-01-17 20:39:55 -05:00
|
|
|
// We may already have some copy data in the buffer, add this message to buffer
|
|
|
|
|
self.buffer.put(&message[..]);
|
|
|
|
|
|
|
|
|
|
self.send_server_message(server, &self.buffer, &address, &pool)
|
2022-08-11 17:42:40 -04:00
|
|
|
.await?;
|
2022-02-15 22:45:45 -08:00
|
|
|
|
2023-01-17 20:39:55 -05:00
|
|
|
// Clear the buffer
|
|
|
|
|
self.buffer.clear();
|
|
|
|
|
|
2023-03-28 17:19:37 +02:00
|
|
|
let response = self
|
|
|
|
|
.receive_server_message(server, &address, &pool, &self.stats.clone())
|
|
|
|
|
.await?;
|
2022-08-25 06:40:56 -07:00
|
|
|
|
2023-05-18 16:41:22 -07:00
|
|
|
match write_all_flush(&mut self.write, &response).await {
|
2022-02-04 08:06:45 -08:00
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => {
|
2023-12-04 19:09:41 -05:00
|
|
|
server.mark_bad(err.to_string().as_str());
|
2022-02-04 08:06:45 -08:00
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
};
|
2022-02-05 15:23:21 -08:00
|
|
|
|
2022-02-14 10:00:55 -08:00
|
|
|
if !server.in_transaction() {
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.transaction();
|
2023-08-10 11:18:46 -04:00
|
|
|
server
|
|
|
|
|
.stats()
|
|
|
|
|
.transaction(self.server_parameters.get_application_name());
|
2022-02-14 10:00:55 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// Release server back to the pool if we are in transaction mode.
|
|
|
|
|
// If we are in session mode, we keep the server until the client disconnects.
|
2022-02-14 10:00:55 -08:00
|
|
|
if self.transaction_mode {
|
|
|
|
|
break;
|
|
|
|
|
}
|
2022-02-05 15:23:21 -08:00
|
|
|
}
|
2022-02-04 08:06:45 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// Some unexpected message. We either did not implement the protocol correctly
|
|
|
|
|
// or this is not a Postgres client we're talking to.
|
2022-02-03 16:25:05 -08:00
|
|
|
_ => {
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Unexpected code: {}", code);
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
}
|
2022-02-04 16:01:35 -08:00
|
|
|
|
2022-02-15 22:45:45 -08:00
|
|
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
2022-02-22 19:26:08 -08:00
|
|
|
debug!("Releasing server back into the pool");
|
2023-06-18 23:02:34 -07:00
|
|
|
|
2022-09-01 22:06:55 -05:00
|
|
|
server.checkin_cleanup().await?;
|
2023-06-18 23:02:34 -07:00
|
|
|
|
2023-03-28 17:19:37 +02:00
|
|
|
server.stats().idle();
|
2022-08-22 11:52:34 -07:00
|
|
|
self.connected_to_server = false;
|
2022-09-01 22:06:55 -05:00
|
|
|
|
2022-02-04 16:01:35 -08:00
|
|
|
self.release();
|
2023-03-28 17:19:37 +02:00
|
|
|
self.stats.idle();
|
2022-02-03 13:54:07 -08:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-04 16:01:35 -08:00
|
|
|
|
2023-01-28 15:36:35 -08:00
|
|
|
/// Retrieve connection pool, if it exists.
|
|
|
|
|
/// Return an error to the client otherwise.
|
|
|
|
|
async fn get_pool(&mut self) -> Result<ConnectionPool, Error> {
|
|
|
|
|
match get_pool(&self.pool_name, &self.username) {
|
|
|
|
|
Some(pool) => Ok(pool),
|
|
|
|
|
None => {
|
|
|
|
|
error_response(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&format!(
|
|
|
|
|
"No pool configured for database: {}, user: {}",
|
|
|
|
|
self.pool_name, self.username
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
Err(Error::ClientError(format!(
|
|
|
|
|
"Invalid pool name {{ username: {}, pool_name: {}, application_name: {} }}",
|
2023-08-10 11:18:46 -04:00
|
|
|
self.pool_name,
|
|
|
|
|
self.username,
|
|
|
|
|
self.server_parameters.get_application_name()
|
2023-01-28 15:36:35 -08:00
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
/// Handles custom protocol messages
|
|
|
|
|
/// Returns true if the message is custom protocol message, false otherwise
|
|
|
|
|
/// Does not work with prepared statements, only simple and extended protocol without parameters
|
|
|
|
|
async fn handle_custom_protocol(
|
|
|
|
|
&mut self,
|
|
|
|
|
query_router: &mut QueryRouter,
|
|
|
|
|
message: &BytesMut,
|
|
|
|
|
pool: &ConnectionPool,
|
|
|
|
|
) -> Result<bool, Error> {
|
|
|
|
|
let current_shard = query_router.shard();
|
|
|
|
|
|
|
|
|
|
match query_router.try_execute_command(message) {
|
|
|
|
|
None => Ok(false),
|
|
|
|
|
|
|
|
|
|
Some(custom) => {
|
|
|
|
|
match custom {
|
|
|
|
|
// SET SHARD TO
|
|
|
|
|
(Command::SetShard, _) => {
|
|
|
|
|
match query_router.shard() {
|
|
|
|
|
None => {}
|
|
|
|
|
Some(selected_shard) => {
|
|
|
|
|
if selected_shard >= pool.shards() {
|
|
|
|
|
// Bad shard number, send error message to client.
|
|
|
|
|
query_router.set_shard(current_shard);
|
|
|
|
|
|
|
|
|
|
error_response(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&format!(
|
|
|
|
|
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
|
|
|
|
|
selected_shard,
|
|
|
|
|
pool.shards(),
|
|
|
|
|
current_shard,
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
} else {
|
|
|
|
|
custom_protocol_response_ok(&mut self.write, "SET SHARD")
|
|
|
|
|
.await?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// SET PRIMARY READS TO
|
|
|
|
|
(Command::SetPrimaryReads, _) => {
|
|
|
|
|
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
|
|
|
|
|
}
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
// SET SHARDING KEY TO
|
|
|
|
|
(Command::SetShardingKey, _) => {
|
|
|
|
|
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SET SERVER ROLE TO
|
|
|
|
|
(Command::SetServerRole, _) => {
|
|
|
|
|
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SHOW SERVER ROLE
|
|
|
|
|
(Command::ShowServerRole, value) => {
|
|
|
|
|
show_response(&mut self.write, "server role", &value).await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SHOW SHARD
|
|
|
|
|
(Command::ShowShard, value) => {
|
|
|
|
|
show_response(&mut self.write, "shard", &value).await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SHOW PRIMARY READS
|
|
|
|
|
(Command::ShowPrimaryReads, value) => {
|
|
|
|
|
show_response(&mut self.write, "primary reads", &value).await?;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(true)
|
|
|
|
|
}
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
2023-10-25 18:11:57 -04:00
|
|
|
}
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
/// Makes sure the the checked out server has the prepared statement and sends it to the server if it doesn't
|
|
|
|
|
async fn ensure_prepared_statement_is_on_server(
|
|
|
|
|
&mut self,
|
|
|
|
|
client_name: String,
|
|
|
|
|
pool: &ConnectionPool,
|
|
|
|
|
server: &mut Server,
|
|
|
|
|
address: &Address,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
match self.prepared_statements.get(&client_name) {
|
|
|
|
|
Some((parse, hash)) => {
|
2023-11-28 21:13:30 -08:00
|
|
|
debug!("Prepared statement `{}` found in cache", client_name);
|
2023-10-25 18:11:57 -04:00
|
|
|
// In this case we want to send the parse message to the server
|
|
|
|
|
// since pgcat is initiating the prepared statement on this specific server
|
2023-11-28 21:13:30 -08:00
|
|
|
match self
|
|
|
|
|
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => match err {
|
|
|
|
|
Error::PreparedStatementError => {
|
|
|
|
|
debug!("Removed {} from client cache", client_name);
|
|
|
|
|
self.prepared_statements.remove(&client_name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ => {
|
|
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
}
|
2023-10-25 18:11:57 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
None => {
|
|
|
|
|
return Err(Error::ClientError(format!(
|
|
|
|
|
"prepared statement `{}` not found",
|
|
|
|
|
client_name
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Register the parse to the server cache and send it to the server if requested (ie. requested by pgcat)
|
|
|
|
|
///
|
|
|
|
|
/// Also updates the pool LRU that this parse was used recently
|
|
|
|
|
async fn register_parse_to_server_cache(
|
|
|
|
|
&self,
|
|
|
|
|
should_send_parse_to_server: bool,
|
|
|
|
|
hash: &u64,
|
|
|
|
|
parse: &Arc<Parse>,
|
|
|
|
|
pool: &ConnectionPool,
|
|
|
|
|
server: &mut Server,
|
|
|
|
|
address: &Address,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
// We want to promote this in the pool's LRU
|
|
|
|
|
pool.promote_prepared_statement_hash(hash);
|
|
|
|
|
|
2023-11-28 21:13:30 -08:00
|
|
|
debug!("Checking for prepared statement {}", parse.name);
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
if let Err(err) = server
|
|
|
|
|
.register_prepared_statement(parse, should_send_parse_to_server)
|
|
|
|
|
.await
|
|
|
|
|
{
|
2023-11-28 21:13:30 -08:00
|
|
|
match err {
|
|
|
|
|
// Don't ban for this.
|
|
|
|
|
Error::PreparedStatementError => (),
|
|
|
|
|
_ => {
|
|
|
|
|
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Register and rewrite the parse statement to the clients statement cache
|
|
|
|
|
/// and also the pool's statement cache. Add it to extended protocol data.
|
|
|
|
|
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
|
|
|
|
// Avoid parsing if prepared statements not enabled
|
2023-11-09 01:36:45 +01:00
|
|
|
if !self.prepared_statements_enabled {
|
2023-10-25 18:11:57 -04:00
|
|
|
debug!("Anonymous parse message");
|
|
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-09 01:36:45 +01:00
|
|
|
let client_given_name = Parse::get_name(&message)?;
|
2023-10-25 18:11:57 -04:00
|
|
|
let parse: Parse = (&message).try_into()?;
|
|
|
|
|
|
|
|
|
|
// Compute the hash of the parse statement
|
|
|
|
|
let hash = parse.get_hash();
|
|
|
|
|
|
|
|
|
|
// Add the statement to the cache or check if we already have it
|
|
|
|
|
let new_parse = match pool.register_parse_to_cache(hash, &parse) {
|
|
|
|
|
Some(parse) => parse,
|
|
|
|
|
None => {
|
|
|
|
|
return Err(Error::ClientError(format!(
|
|
|
|
|
"Could not store Prepared statement `{}`",
|
|
|
|
|
client_given_name
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
};
|
2023-06-16 12:57:44 -07:00
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
"Renamed prepared statement `{}` to `{}` and saved to cache",
|
2023-10-25 18:11:57 -04:00
|
|
|
client_given_name, new_parse.name
|
2023-06-16 12:57:44 -07:00
|
|
|
);
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.prepared_statements
|
|
|
|
|
.insert(client_given_name, (new_parse.clone(), hash));
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_parse(
|
|
|
|
|
new_parse.as_ref().try_into()?,
|
|
|
|
|
Some((new_parse.clone(), hash)),
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
Ok(())
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Rewrite the Bind (F) message to use the prepared statement name
|
|
|
|
|
/// saved in the client cache.
|
2023-10-25 18:11:57 -04:00
|
|
|
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
|
|
|
|
// Avoid parsing if prepared statements not enabled
|
2023-11-09 01:36:45 +01:00
|
|
|
if !self.prepared_statements_enabled {
|
2023-06-16 12:57:44 -07:00
|
|
|
debug!("Anonymous bind message");
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
|
|
|
|
return Ok(());
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
|
2023-11-09 01:36:45 +01:00
|
|
|
let client_given_name = Bind::get_name(&message)?;
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
match self.prepared_statements.get(&client_given_name) {
|
|
|
|
|
Some((rewritten_parse, _)) => {
|
|
|
|
|
let message = Bind::rename(message, &rewritten_parse.name)?;
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
debug!(
|
|
|
|
|
"Rewrote bind `{}` to `{}`",
|
|
|
|
|
client_given_name, rewritten_parse.name
|
|
|
|
|
);
|
2023-06-16 12:57:44 -07:00
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer.push_back(
|
|
|
|
|
ExtendedProtocolData::create_new_bind(message, Some(client_given_name)),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Ok(())
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
None => {
|
2023-10-25 18:11:57 -04:00
|
|
|
debug!(
|
|
|
|
|
"Got bind for unknown prepared statement {:?}",
|
|
|
|
|
client_given_name
|
|
|
|
|
);
|
2023-06-16 12:57:44 -07:00
|
|
|
|
|
|
|
|
error_response(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&format!(
|
|
|
|
|
"prepared statement \"{}\" does not exist",
|
2023-10-25 18:11:57 -04:00
|
|
|
client_given_name
|
2023-06-16 12:57:44 -07:00
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
Err(Error::ClientError(format!(
|
|
|
|
|
"Prepared statement `{}` doesn't exist",
|
2023-10-25 18:11:57 -04:00
|
|
|
client_given_name
|
2023-06-16 12:57:44 -07:00
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Rewrite the Describe (F) message to use the prepared statement name
|
|
|
|
|
/// saved in the client cache.
|
2023-10-25 18:11:57 -04:00
|
|
|
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
|
|
|
|
|
// Avoid parsing if prepared statements not enabled
|
2023-11-09 01:36:45 +01:00
|
|
|
if !self.prepared_statements_enabled {
|
2023-10-25 18:11:57 -04:00
|
|
|
debug!("Anonymous describe message");
|
|
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
|
|
|
|
|
|
|
|
|
return Ok(());
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
|
2023-11-09 01:36:45 +01:00
|
|
|
let describe: Describe = (&message).try_into()?;
|
|
|
|
|
if describe.target == 'P' {
|
|
|
|
|
debug!("Portal describe message");
|
|
|
|
|
self.extended_protocol_data_buffer
|
|
|
|
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
|
|
|
|
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
let client_given_name = describe.statement_name.clone();
|
|
|
|
|
|
|
|
|
|
match self.prepared_statements.get(&client_given_name) {
|
|
|
|
|
Some((rewritten_parse, _)) => {
|
|
|
|
|
let describe = describe.rename(&rewritten_parse.name);
|
2023-06-16 12:57:44 -07:00
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
"Rewrote describe `{}` to `{}`",
|
2023-10-25 18:11:57 -04:00
|
|
|
client_given_name, describe.statement_name
|
2023-06-16 12:57:44 -07:00
|
|
|
);
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
self.extended_protocol_data_buffer.push_back(
|
|
|
|
|
ExtendedProtocolData::create_new_describe(
|
|
|
|
|
describe.try_into()?,
|
|
|
|
|
Some(client_given_name),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Ok(())
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
None => {
|
|
|
|
|
debug!("Got describe for unknown prepared statement {:?}", describe);
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
error_response(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&format!(
|
|
|
|
|
"prepared statement \"{}\" does not exist",
|
|
|
|
|
client_given_name
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
Err(Error::ClientError(format!(
|
|
|
|
|
"Prepared statement `{}` doesn't exist",
|
|
|
|
|
client_given_name
|
|
|
|
|
)))
|
2023-06-16 12:57:44 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-25 18:11:57 -04:00
|
|
|
fn reset_buffered_state(&mut self) {
|
|
|
|
|
self.buffer.clear();
|
|
|
|
|
self.extended_protocol_data_buffer.clear();
|
|
|
|
|
self.response_message_queue_buffer.clear();
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Release the server from the client: it can't cancel its queries anymore.
|
2022-02-11 11:19:40 -08:00
|
|
|
pub fn release(&self) {
|
2022-02-24 08:44:41 -08:00
|
|
|
let mut guard = self.client_server_map.lock();
|
2022-02-04 16:01:35 -08:00
|
|
|
guard.remove(&(self.process_id, self.secret_key));
|
|
|
|
|
}
|
2022-08-11 17:42:40 -04:00
|
|
|
|
2022-08-18 08:12:38 -04:00
|
|
|
async fn send_and_receive_loop(
|
|
|
|
|
&mut self,
|
|
|
|
|
code: char,
|
2023-01-17 20:39:55 -05:00
|
|
|
message: Option<&BytesMut>,
|
2022-08-18 08:12:38 -04:00
|
|
|
server: &mut Server,
|
|
|
|
|
address: &Address,
|
|
|
|
|
pool: &ConnectionPool,
|
2023-03-28 17:19:37 +02:00
|
|
|
client_stats: &ClientStats,
|
2022-08-18 08:12:38 -04:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
debug!("Sending {} to server", code);
|
|
|
|
|
|
2023-01-17 20:39:55 -05:00
|
|
|
let message = match message {
|
|
|
|
|
Some(message) => message,
|
|
|
|
|
None => &self.buffer,
|
|
|
|
|
};
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
self.send_server_message(server, message, address, pool)
|
2022-08-18 08:12:38 -04:00
|
|
|
.await?;
|
|
|
|
|
|
2022-09-15 01:21:45 -05:00
|
|
|
let query_start = Instant::now();
|
2022-08-18 08:12:38 -04:00
|
|
|
// Read all data the server has to offer, which can be multiple messages
|
|
|
|
|
// buffered in 8196 bytes chunks.
|
|
|
|
|
loop {
|
2023-03-28 17:19:37 +02:00
|
|
|
let response = self
|
|
|
|
|
.receive_server_message(server, address, pool, client_stats)
|
|
|
|
|
.await?;
|
2022-08-18 08:12:38 -04:00
|
|
|
|
2023-05-18 16:41:22 -07:00
|
|
|
match write_all_flush(&mut self.write, &response).await {
|
2022-08-18 08:12:38 -04:00
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => {
|
2023-10-25 18:11:57 -04:00
|
|
|
// We might be in some kind of error/in between protocol state, better to just kill this server
|
2023-12-04 19:09:41 -05:00
|
|
|
server.mark_bad(err.to_string().as_str());
|
2022-08-18 08:12:38 -04:00
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if !server.is_data_available() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Report query executed statistics.
|
2023-03-28 17:19:37 +02:00
|
|
|
client_stats.query();
|
|
|
|
|
server.stats().query(
|
|
|
|
|
Instant::now().duration_since(query_start).as_millis() as u64,
|
2023-10-10 09:18:21 -07:00
|
|
|
self.server_parameters.get_application_name(),
|
2022-09-15 01:21:45 -05:00
|
|
|
);
|
2022-08-18 08:12:38 -04:00
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
async fn send_server_message(
|
|
|
|
|
&self,
|
|
|
|
|
server: &mut Server,
|
2023-01-17 20:39:55 -05:00
|
|
|
message: &BytesMut,
|
2022-08-11 17:42:40 -04:00
|
|
|
address: &Address,
|
|
|
|
|
pool: &ConnectionPool,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
match server.send(message).await {
|
|
|
|
|
Ok(_) => Ok(()),
|
|
|
|
|
Err(err) => {
|
2023-03-28 17:19:37 +02:00
|
|
|
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
2022-08-11 17:42:40 -04:00
|
|
|
Err(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn receive_server_message(
|
2022-08-13 13:45:58 -07:00
|
|
|
&mut self,
|
2022-08-11 17:42:40 -04:00
|
|
|
server: &mut Server,
|
|
|
|
|
address: &Address,
|
|
|
|
|
pool: &ConnectionPool,
|
2023-03-28 17:19:37 +02:00
|
|
|
client_stats: &ClientStats,
|
2022-08-11 17:42:40 -04:00
|
|
|
) -> Result<BytesMut, Error> {
|
2023-08-10 11:18:46 -04:00
|
|
|
let statement_timeout_duration = match pool.settings.user.statement_timeout {
|
|
|
|
|
0 => tokio::time::Duration::MAX,
|
|
|
|
|
timeout => tokio::time::Duration::from_millis(timeout),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
match tokio::time::timeout(
|
|
|
|
|
statement_timeout_duration,
|
|
|
|
|
server.recv(Some(&mut self.server_parameters)),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(result) => match result {
|
2022-08-13 13:45:58 -07:00
|
|
|
Ok(message) => Ok(message),
|
|
|
|
|
Err(err) => {
|
2023-03-28 17:19:37 +02:00
|
|
|
pool.ban(address, BanReason::MessageReceiveFailed, Some(client_stats));
|
2022-08-13 13:45:58 -07:00
|
|
|
error_response_terminal(
|
|
|
|
|
&mut self.write,
|
|
|
|
|
&format!("error receiving data from server: {:?}", err),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
Err(err)
|
|
|
|
|
}
|
2023-08-10 11:18:46 -04:00
|
|
|
},
|
|
|
|
|
Err(_) => {
|
2023-12-04 19:09:41 -05:00
|
|
|
server.mark_bad(
|
|
|
|
|
format!(
|
|
|
|
|
"Statement timeout while talking to {:?} with user {}",
|
|
|
|
|
address, pool.settings.user.username
|
|
|
|
|
)
|
|
|
|
|
.as_str(),
|
2023-08-10 11:18:46 -04:00
|
|
|
);
|
|
|
|
|
pool.ban(address, BanReason::StatementTimeout, Some(client_stats));
|
|
|
|
|
error_response_terminal(&mut self.write, "pool statement timeout").await?;
|
|
|
|
|
Err(Error::StatementTimeout)
|
2022-08-11 17:42:40 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
2022-02-20 12:40:09 -08:00
|
|
|
|
2022-06-27 15:52:01 -07:00
|
|
|
impl<S, T> Drop for Client<S, T> {
|
2022-02-20 12:40:09 -08:00
|
|
|
fn drop(&mut self) {
|
2022-06-27 16:45:41 -07:00
|
|
|
let mut guard = self.client_server_map.lock();
|
|
|
|
|
guard.remove(&(self.process_id, self.secret_key));
|
|
|
|
|
|
2022-08-22 11:52:34 -07:00
|
|
|
// Dirty shutdown
|
|
|
|
|
// TODO: refactor, this is not the best way to handle state management.
|
2023-03-28 17:19:37 +02:00
|
|
|
if self.connected_to_server && self.last_server_stats.is_some() {
|
|
|
|
|
self.last_server_stats.as_ref().unwrap().idle();
|
2022-03-04 17:04:27 -08:00
|
|
|
}
|
2022-02-20 12:40:09 -08:00
|
|
|
}
|
|
|
|
|
}
|