2022-03-10 01:33:29 -08:00
/// Handle clients by pretending to be a PostgreSQL server.
2022-02-04 09:28:52 -08:00
use bytes ::{ Buf , BufMut , BytesMut } ;
2022-09-23 11:08:38 -05:00
use log ::{ debug , error , info , trace , warn } ;
2022-03-10 01:33:29 -08:00
use std ::collections ::HashMap ;
2022-09-15 01:21:45 -05:00
use std ::time ::Instant ;
2022-06-27 16:45:41 -07:00
use tokio ::io ::{ split , AsyncReadExt , BufReader , ReadHalf , WriteHalf } ;
use tokio ::net ::TcpStream ;
2022-08-08 19:01:24 -04:00
use tokio ::sync ::broadcast ::Receiver ;
2022-08-25 06:40:56 -07:00
use tokio ::sync ::mpsc ::Sender ;
2022-02-03 13:35:40 -08:00
2022-07-31 21:52:23 -05:00
use crate ::admin ::{ generate_server_info_for_admin , handle_admin } ;
2022-09-23 02:00:46 -04:00
use crate ::config ::{ get_config , Address , PoolMode } ;
2022-02-15 22:45:45 -08:00
use crate ::constants ::* ;
2022-02-03 13:35:40 -08:00
use crate ::errors ::Error ;
use crate ::messages ::* ;
2022-09-23 02:00:46 -04:00
use crate ::pool ::{ get_pool , ClientServerMap , ConnectionPool } ;
2022-02-19 08:57:24 -08:00
use crate ::query_router ::{ Command , QueryRouter } ;
2022-02-04 16:01:35 -08:00
use crate ::server ::Server ;
2022-06-24 14:52:38 -07:00
use crate ::stats ::{ get_reporter , Reporter } ;
2022-06-27 16:45:41 -07:00
use crate ::tls ::Tls ;
2022-06-27 15:52:01 -07:00
use tokio_rustls ::server ::TlsStream ;
/// Type of connection received from client.
enum ClientConnectionType {
Startup ,
Tls ,
CancelQuery ,
}
2022-02-08 13:11:50 -08:00
2022-02-06 10:48:14 -08:00
/// The client state. One of these is created per client.
2022-06-27 15:52:01 -07:00
pub struct Client < S , T > {
2022-03-10 01:33:29 -08:00
/// The reads are buffered (8K by default).
2022-06-27 15:52:01 -07:00
read : BufReader < S > ,
2022-02-06 10:48:14 -08:00
2022-03-10 01:33:29 -08:00
/// We buffer the writes ourselves because we know the protocol
/// better than a stock buffer.
2022-06-27 15:52:01 -07:00
write : T ,
2022-02-06 10:48:14 -08:00
2022-03-10 01:33:29 -08:00
/// Internal buffer, where we place messages until we have to flush
/// them to the backend.
2022-02-03 15:33:26 -08:00
buffer : BytesMut ,
2022-02-06 10:48:14 -08:00
2022-06-27 16:45:41 -07:00
/// Address
addr : std ::net ::SocketAddr ,
2022-03-10 01:33:29 -08:00
/// The client was started with the sole reason to cancel another running query.
2022-02-04 09:28:52 -08:00
cancel_mode : bool ,
2022-02-06 10:48:14 -08:00
2022-03-10 01:33:29 -08:00
/// In transaction mode, the connection is released after each transaction.
/// Session mode has slightly higher throughput per client, but lower capacity.
2022-02-05 15:23:21 -08:00
transaction_mode : bool ,
2022-02-06 10:48:14 -08:00
2022-03-10 01:33:29 -08:00
/// For query cancellation, the client is given a random process ID and secret on startup.
2022-02-04 09:28:52 -08:00
process_id : i32 ,
secret_key : i32 ,
2022-02-06 10:48:14 -08:00
2022-03-10 01:33:29 -08:00
/// Clients are mapped to servers while they use them. This allows a client
/// to connect and cancel a query.
2022-02-04 16:01:35 -08:00
client_server_map : ClientServerMap ,
2022-02-11 11:19:40 -08:00
2022-03-10 01:33:29 -08:00
/// Client parameters, e.g. user, client_encoding, etc.
2022-09-01 22:06:55 -05:00
#[ allow(dead_code) ]
2022-02-14 05:11:53 -08:00
parameters : HashMap < String , String > ,
2022-02-14 10:00:55 -08:00
2022-03-10 01:33:29 -08:00
/// Statistics
2022-02-14 10:00:55 -08:00
stats : Reporter ,
2022-02-25 18:20:15 -08:00
2022-03-10 01:33:29 -08:00
/// Clients want to talk to admin database.
2022-02-25 18:20:15 -08:00
admin : bool ,
2022-03-04 17:04:27 -08:00
2022-03-10 01:33:29 -08:00
/// Last address the client talked to.
2022-03-04 17:04:27 -08:00
last_address_id : Option < usize > ,
2022-03-10 01:33:29 -08:00
/// Last server process id we talked to.
2022-03-04 17:04:27 -08:00
last_server_id : Option < i32 > ,
2022-07-27 21:47:55 -05:00
2022-08-22 11:52:34 -07:00
/// Connected to server
connected_to_server : bool ,
2022-08-09 14:18:27 -05:00
/// Name of the server pool for this client (This comes from the database name in the connection string)
2022-08-25 06:40:56 -07:00
pool_name : String ,
2022-08-08 19:01:24 -04:00
2022-08-09 14:18:27 -05:00
/// Postgres user for this client (This comes from the user in the connection string)
2022-08-25 06:40:56 -07:00
username : String ,
2022-08-09 14:18:27 -05:00
2022-09-01 22:06:55 -05:00
/// Application name for this client (defaults to pgcat)
application_name : String ,
2022-08-09 14:18:27 -05:00
/// Used to notify clients about an impending shutdown
2022-08-25 06:40:56 -07:00
shutdown : Receiver < ( ) > ,
2022-02-03 13:35:40 -08:00
}
2022-06-27 16:45:41 -07:00
/// Client entrypoint.
pub async fn client_entrypoint (
2022-06-27 15:52:01 -07:00
mut stream : TcpStream ,
client_server_map : ClientServerMap ,
2022-08-25 06:40:56 -07:00
shutdown : Receiver < ( ) > ,
2022-09-05 03:21:06 -05:00
drain : Sender < i32 > ,
2022-08-25 06:40:56 -07:00
admin_only : bool ,
2022-11-16 22:15:47 -08:00
tls_certificate : Option < String > ,
log_client_connections : bool ,
2022-06-27 15:52:01 -07:00
) -> Result < ( ) , Error > {
2022-06-27 16:45:41 -07:00
// Figure out if the client wants TLS or not.
let addr = stream . peer_addr ( ) . unwrap ( ) ;
2022-06-27 15:52:01 -07:00
match get_startup ::< TcpStream > ( & mut stream ) . await {
2022-06-27 16:45:41 -07:00
// Client requested a TLS connection.
Ok ( ( ClientConnectionType ::Tls , _ ) ) = > {
// TLS settings are configured, will setup TLS now.
2022-11-16 22:15:47 -08:00
if tls_certificate ! = None {
2022-06-27 16:45:41 -07:00
debug! ( " Accepting TLS request " ) ;
let mut yes = BytesMut ::new ( ) ;
yes . put_u8 ( b 'S' ) ;
write_all ( & mut stream , yes ) . await ? ;
// Negotiate TLS.
2022-08-25 06:40:56 -07:00
match startup_tls ( stream , client_server_map , shutdown , admin_only ) . await {
2022-06-27 16:45:41 -07:00
Ok ( mut client ) = > {
2022-11-16 22:15:47 -08:00
if log_client_connections {
info! ( " Client {:?} connected (TLS) " , addr ) ;
} else {
debug! ( " Client {:?} connected (TLS) " , addr ) ;
}
2022-06-27 16:45:41 -07:00
2022-08-25 06:40:56 -07:00
if ! client . is_admin ( ) {
let _ = drain . send ( 1 ) . await ;
}
let result = client . handle ( ) . await ;
if ! client . is_admin ( ) {
let _ = drain . send ( - 1 ) . await ;
}
result
2022-06-27 16:45:41 -07:00
}
Err ( err ) = > Err ( err ) ,
}
}
// TLS is not configured, we cannot offer it.
else {
// Rejecting client request for TLS.
let mut no = BytesMut ::new ( ) ;
no . put_u8 ( b 'N' ) ;
write_all ( & mut stream , no ) . await ? ;
// Attempting regular startup. Client can disconnect now
// if they choose.
match get_startup ::< TcpStream > ( & mut stream ) . await {
// Client accepted unencrypted connection.
Ok ( ( ClientConnectionType ::Startup , bytes ) ) = > {
let ( read , write ) = split ( stream ) ;
// Continue with regular startup.
2022-08-08 19:01:24 -04:00
match Client ::startup (
read ,
write ,
addr ,
bytes ,
client_server_map ,
2022-08-25 06:40:56 -07:00
shutdown ,
admin_only ,
2022-08-08 19:01:24 -04:00
)
. await
{
2022-06-27 16:45:41 -07:00
Ok ( mut client ) = > {
2022-11-16 22:15:47 -08:00
if log_client_connections {
info! ( " Client {:?} connected (plain) " , addr ) ;
} else {
debug! ( " Client {:?} connected (plain) " , addr ) ;
}
2022-06-27 16:45:41 -07:00
2022-08-25 06:40:56 -07:00
if ! client . is_admin ( ) {
let _ = drain . send ( 1 ) . await ;
}
let result = client . handle ( ) . await ;
if ! client . is_admin ( ) {
let _ = drain . send ( - 1 ) . await ;
}
result
2022-06-27 16:45:41 -07:00
}
Err ( err ) = > Err ( err ) ,
}
}
// Client probably disconnected rejecting our plain text connection.
2022-11-17 09:24:39 -08:00
Ok ( ( ClientConnectionType ::Tls , _ ) )
| Ok ( ( ClientConnectionType ::CancelQuery , _ ) ) = > Err ( Error ::ProtocolSyncError (
format! ( " Bad postgres client (plain) " ) ,
) ) ,
Err ( err ) = > Err ( err ) ,
2022-06-27 16:45:41 -07:00
}
2022-06-27 15:52:01 -07:00
}
}
2022-02-03 13:35:40 -08:00
2022-06-27 16:45:41 -07:00
// Client wants to use plain connection without encryption.
2022-06-27 15:52:01 -07:00
Ok ( ( ClientConnectionType ::Startup , bytes ) ) = > {
let ( read , write ) = split ( stream ) ;
2022-06-27 16:45:41 -07:00
// Continue with regular startup.
2022-08-08 19:01:24 -04:00
match Client ::startup (
read ,
write ,
addr ,
bytes ,
client_server_map ,
2022-08-25 06:40:56 -07:00
shutdown ,
admin_only ,
2022-08-08 19:01:24 -04:00
)
. await
{
2022-06-27 16:45:41 -07:00
Ok ( mut client ) = > {
2022-11-16 22:15:47 -08:00
if log_client_connections {
info! ( " Client {:?} connected (plain) " , addr ) ;
} else {
debug! ( " Client {:?} connected (plain) " , addr ) ;
}
2022-06-27 16:45:41 -07:00
2022-09-05 04:02:49 -04:00
if ! client . is_admin ( ) {
2022-08-25 06:40:56 -07:00
let _ = drain . send ( 1 ) . await ;
}
let result = client . handle ( ) . await ;
if ! client . is_admin ( ) {
let _ = drain . send ( - 1 ) . await ;
}
result
2022-06-27 16:45:41 -07:00
}
2022-06-27 15:52:01 -07:00
Err ( err ) = > Err ( err ) ,
}
}
2022-02-22 19:26:08 -08:00
2022-06-27 16:45:41 -07:00
// Client wants to cancel a query.
2022-06-27 15:52:01 -07:00
Ok ( ( ClientConnectionType ::CancelQuery , bytes ) ) = > {
2022-06-27 16:45:41 -07:00
let ( read , write ) = split ( stream ) ;
// Continue with cancel query request.
2022-08-25 06:40:56 -07:00
match Client ::cancel ( read , write , addr , bytes , client_server_map , shutdown ) . await {
2022-06-27 16:45:41 -07:00
Ok ( mut client ) = > {
info! ( " Client {:?} issued a cancel query request " , addr ) ;
2022-09-05 04:02:49 -04:00
if ! client . is_admin ( ) {
2022-08-25 06:40:56 -07:00
let _ = drain . send ( 1 ) . await ;
}
let result = client . handle ( ) . await ;
if ! client . is_admin ( ) {
let _ = drain . send ( - 1 ) . await ;
}
result
2022-06-27 16:45:41 -07:00
}
Err ( err ) = > Err ( err ) ,
}
2022-06-27 15:52:01 -07:00
}
2022-02-03 13:35:40 -08:00
2022-06-27 16:45:41 -07:00
// Something failed, probably the socket.
2022-06-27 15:52:01 -07:00
Err ( err ) = > Err ( err ) ,
}
}
2022-02-03 13:35:40 -08:00
2022-06-27 16:45:41 -07:00
/// Handle the first message the client sends.
2022-06-27 15:52:01 -07:00
async fn get_startup < S > ( stream : & mut S ) -> Result < ( ClientConnectionType , BytesMut ) , Error >
where
S : tokio ::io ::AsyncRead + std ::marker ::Unpin + tokio ::io ::AsyncWrite ,
{
// Get startup message length.
let len = match stream . read_i32 ( ) . await {
Ok ( len ) = > len ,
Err ( _ ) = > return Err ( Error ::ClientBadStartup ) ,
} ;
// Get the rest of the message.
let mut startup = vec! [ 0 u8 ; len as usize - 4 ] ;
match stream . read_exact ( & mut startup ) . await {
Ok ( _ ) = > ( ) ,
Err ( _ ) = > return Err ( Error ::ClientBadStartup ) ,
} ;
let mut bytes = BytesMut ::from ( & startup [ .. ] ) ;
let code = bytes . get_i32 ( ) ;
match code {
// Client is requesting SSL (TLS).
SSL_REQUEST_CODE = > Ok ( ( ClientConnectionType ::Tls , bytes ) ) ,
// Client wants to use plain text, requesting regular startup.
PROTOCOL_VERSION_NUMBER = > Ok ( ( ClientConnectionType ::Startup , bytes ) ) ,
// Client is requesting to cancel a running query (plain text connection).
CANCEL_REQUEST_CODE = > Ok ( ( ClientConnectionType ::CancelQuery , bytes ) ) ,
2022-06-27 16:45:41 -07:00
// Something else, probably something is wrong and it's not our fault,
// e.g. badly implemented Postgres client.
2022-11-17 09:24:39 -08:00
_ = > Err ( Error ::ProtocolSyncError ( format! (
" Unexpected startup code: {} " ,
code
) ) ) ,
2022-06-27 15:52:01 -07:00
}
}
2022-02-04 09:28:52 -08:00
2022-08-11 17:42:40 -04:00
/// Handle TLS connection negotiation.
2022-06-27 15:52:01 -07:00
pub async fn startup_tls (
2022-06-27 16:45:41 -07:00
stream : TcpStream ,
2022-06-27 15:52:01 -07:00
client_server_map : ClientServerMap ,
2022-08-25 06:40:56 -07:00
shutdown : Receiver < ( ) > ,
admin_only : bool ,
2022-06-27 15:52:01 -07:00
) -> Result < Client < ReadHalf < TlsStream < TcpStream > > , WriteHalf < TlsStream < TcpStream > > > , Error > {
// Negotiate TLS.
2022-06-27 16:45:41 -07:00
let tls = Tls ::new ( ) ? ;
let addr = stream . peer_addr ( ) . unwrap ( ) ;
2022-06-27 15:52:01 -07:00
let mut stream = match tls . acceptor . accept ( stream ) . await {
Ok ( stream ) = > stream ,
2022-06-27 16:45:41 -07:00
2022-08-11 17:42:40 -04:00
// TLS negotiation failed.
2022-06-27 16:45:41 -07:00
Err ( err ) = > {
error! ( " TLS negotiation failed: {:?} " , err ) ;
2022-06-27 17:01:40 -07:00
return Err ( Error ::TlsError ) ;
2022-06-27 16:45:41 -07:00
}
2022-06-27 15:52:01 -07:00
} ;
2022-08-11 17:42:40 -04:00
// TLS negotiation successful.
2022-06-27 16:45:41 -07:00
// Continue with regular startup using encrypted connection.
2022-06-27 15:52:01 -07:00
match get_startup ::< TlsStream < TcpStream > > ( & mut stream ) . await {
2022-06-27 16:45:41 -07:00
// Got good startup message, proceeding like normal except we
// are encrypted now.
2022-06-27 15:52:01 -07:00
Ok ( ( ClientConnectionType ::Startup , bytes ) ) = > {
let ( read , write ) = split ( stream ) ;
2022-06-27 16:45:41 -07:00
2022-08-08 19:01:24 -04:00
Client ::startup (
read ,
write ,
addr ,
bytes ,
client_server_map ,
2022-08-25 06:40:56 -07:00
shutdown ,
admin_only ,
2022-08-08 19:01:24 -04:00
)
. await
2022-06-27 15:52:01 -07:00
}
2022-06-27 16:45:41 -07:00
// Bad Postgres client.
2022-11-17 09:24:39 -08:00
Ok ( ( ClientConnectionType ::Tls , _ ) ) | Ok ( ( ClientConnectionType ::CancelQuery , _ ) ) = > Err (
Error ::ProtocolSyncError ( format! ( " Bad postgres client (tls) " ) ) ,
) ,
Err ( err ) = > Err ( err ) ,
2022-06-27 15:52:01 -07:00
}
}
2022-02-04 09:28:52 -08:00
2022-06-27 15:52:01 -07:00
impl < S , T > Client < S , T >
where
S : tokio ::io ::AsyncRead + std ::marker ::Unpin ,
T : tokio ::io ::AsyncWrite + std ::marker ::Unpin ,
{
2022-08-25 06:40:56 -07:00
pub fn is_admin ( & self ) -> bool {
self . admin
}
2022-06-27 16:45:41 -07:00
/// Handle Postgres client startup after TLS negotiation is complete
/// or over plain text.
pub async fn startup (
2022-06-27 15:52:01 -07:00
mut read : S ,
mut write : T ,
2022-06-27 16:45:41 -07:00
addr : std ::net ::SocketAddr ,
2022-06-27 15:52:01 -07:00
bytes : BytesMut , // The rest of the startup message.
client_server_map : ClientServerMap ,
2022-08-25 06:40:56 -07:00
shutdown : Receiver < ( ) > ,
admin_only : bool ,
2022-06-27 15:52:01 -07:00
) -> Result < Client < S , T > , Error > {
let stats = get_reporter ( ) ;
let parameters = parse_startup ( bytes . clone ( ) ) ? ;
2022-08-25 06:40:56 -07:00
2022-11-16 18:49:04 -08:00
// This parameter is mandatory by the protocol.
2022-08-25 06:40:56 -07:00
let username = match parameters . get ( " user " ) {
2022-07-27 21:47:55 -05:00
Some ( user ) = > user ,
2022-11-17 09:24:39 -08:00
None = > {
return Err ( Error ::ClientError (
" Missing user parameter on client startup " . to_string ( ) ,
) )
}
2022-07-27 21:47:55 -05:00
} ;
2022-11-16 18:49:04 -08:00
let pool_name = match parameters . get ( " database " ) {
Some ( db ) = > db ,
None = > username ,
} ;
2022-09-01 22:06:55 -05:00
let application_name = match parameters . get ( " application_name " ) {
Some ( application_name ) = > application_name ,
None = > " pgcat " ,
} ;
2022-07-27 21:47:55 -05:00
let admin = [ " pgcat " , " pgbouncer " ]
. iter ( )
2022-11-10 02:04:31 +08:00
. filter ( | db | * db = = pool_name )
2022-07-27 21:47:55 -05:00
. count ( )
= = 1 ;
2022-06-20 06:15:54 -07:00
2022-08-25 06:40:56 -07:00
// Kick any client that's not admin while we're in admin-only mode.
if ! admin & & admin_only {
debug! (
" Rejecting non-admin connection to {} when in admin only mode " ,
pool_name
) ;
error_response_terminal (
& mut write ,
2022-11-10 02:04:31 +08:00
" terminating connection due to administrator command " ,
2022-08-25 06:40:56 -07:00
)
. await ? ;
return Err ( Error ::ShuttingDown ) ;
}
2022-06-27 15:52:01 -07:00
// Generate random backend ID and secret key
let process_id : i32 = rand ::random ( ) ;
let secret_key : i32 = rand ::random ( ) ;
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge ( & mut write ) . await ? ;
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
let code = match read . read_u8 ( ) . await {
Ok ( p ) = > p ,
2023-02-07 15:16:28 -05:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading password code from client {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ,
2022-06-27 15:52:01 -07:00
} ;
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
// PasswordMessage
if code as char ! = 'p' {
2022-11-17 09:24:39 -08:00
return Err ( Error ::ProtocolSyncError ( format! (
" Expected p, got {} " ,
code as char
) ) ) ;
2022-06-27 15:52:01 -07:00
}
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
let len = match read . read_i32 ( ) . await {
Ok ( len ) = > len ,
2023-02-07 15:16:28 -05:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading password message length from client {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ,
2022-06-27 15:52:01 -07:00
} ;
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
let mut password_response = vec! [ 0 u8 ; ( len - 4 ) as usize ] ;
2022-06-20 06:15:54 -07:00
2022-06-27 15:52:01 -07:00
match read . read_exact ( & mut password_response ) . await {
Ok ( _ ) = > ( ) ,
2023-02-07 15:16:28 -05:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading password message from client {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ,
2022-06-27 15:52:01 -07:00
} ;
2022-02-04 09:28:52 -08:00
2022-08-25 06:40:56 -07:00
// Authenticate admin user.
2022-08-09 14:18:27 -05:00
let ( transaction_mode , server_info ) = if admin {
2022-11-17 09:22:12 -08:00
let config = get_config ( ) ;
2022-07-27 21:47:55 -05:00
// Compare server and client hashes.
2022-08-25 06:40:56 -07:00
let password_hash = md5_hash_password (
& config . general . admin_username ,
& config . general . admin_password ,
& salt ,
) ;
2022-07-27 21:47:55 -05:00
if password_hash ! = password_response {
2023-02-07 15:16:28 -05:00
warn! ( " Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }} " , username , pool_name , application_name ) ;
2022-08-25 06:40:56 -07:00
wrong_password ( & mut write , username ) . await ? ;
2023-02-07 15:16:28 -05:00
return Err ( Error ::ClientError ( format! ( " Invalid password {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ;
2022-07-27 21:47:55 -05:00
}
2022-08-09 14:18:27 -05:00
( false , generate_server_info_for_admin ( ) )
2022-08-25 06:40:56 -07:00
}
// Authenticate normal user.
else {
2023-01-28 15:36:35 -08:00
let mut pool = match get_pool ( pool_name , username ) {
2022-07-27 21:47:55 -05:00
Some ( pool ) = > pool ,
None = > {
error_response (
& mut write ,
& format! (
" No pool configured for database: {:?}, user: {:?} " ,
2022-08-25 06:40:56 -07:00
pool_name , username
2022-07-27 21:47:55 -05:00
) ,
)
. await ? ;
2022-08-25 06:40:56 -07:00
2023-02-07 15:16:28 -05:00
return Err ( Error ::ClientError ( format! ( " Invalid pool name {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ;
2022-07-27 21:47:55 -05:00
}
} ;
2022-08-25 06:40:56 -07:00
2022-07-27 21:47:55 -05:00
// Compare server and client hashes.
2022-11-10 02:04:31 +08:00
let password_hash = md5_hash_password ( username , & pool . settings . user . password , & salt ) ;
2022-02-03 13:35:40 -08:00
2022-07-27 21:47:55 -05:00
if password_hash ! = password_response {
2023-02-07 15:16:28 -05:00
warn! ( " Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }} " , username , pool_name , application_name ) ;
2022-08-25 06:40:56 -07:00
wrong_password ( & mut write , username ) . await ? ;
2023-02-07 15:16:28 -05:00
return Err ( Error ::ClientError ( format! ( " Invalid password {{ username: {:?} , pool_name: {:?} , application_name: {:?} }} " , username , pool_name , application_name ) ) ) ;
2022-07-27 21:47:55 -05:00
}
2022-08-25 06:40:56 -07:00
let transaction_mode = pool . settings . pool_mode = = PoolMode ::Transaction ;
2023-01-28 15:36:35 -08:00
// If the pool hasn't been validated yet,
// connect to the servers and figure out what's what.
if ! pool . validated ( ) {
match pool . validate ( ) . await {
Ok ( _ ) = > ( ) ,
Err ( err ) = > {
error_response (
& mut write ,
& format! (
" Pool down for database: {:?}, user: {:?} " ,
pool_name , username
) ,
)
. await ? ;
return Err ( Error ::ClientError ( format! ( " Pool down: {:?} " , err ) ) ) ;
}
}
}
2022-08-25 06:40:56 -07:00
( transaction_mode , pool . server_info ( ) )
2022-07-31 21:52:23 -05:00
} ;
2022-06-27 15:52:01 -07:00
debug! ( " Password authentication successful " ) ;
auth_ok ( & mut write ) . await ? ;
2022-07-31 21:52:23 -05:00
write_all ( & mut write , server_info ) . await ? ;
2022-06-27 15:52:01 -07:00
backend_key_data ( & mut write , process_id , secret_key ) . await ? ;
ready_for_query ( & mut write ) . await ? ;
trace! ( " Startup OK " ) ;
2022-11-10 02:04:31 +08:00
Ok ( Client {
2022-06-27 15:52:01 -07:00
read : BufReader ::new ( read ) ,
2022-11-10 02:04:31 +08:00
write ,
2022-06-27 16:45:41 -07:00
addr ,
2022-06-27 15:52:01 -07:00
buffer : BytesMut ::with_capacity ( 8196 ) ,
cancel_mode : false ,
2022-08-25 06:40:56 -07:00
transaction_mode ,
process_id ,
secret_key ,
client_server_map ,
2022-07-27 21:47:55 -05:00
parameters : parameters . clone ( ) ,
2022-11-10 02:04:31 +08:00
stats ,
admin ,
2022-06-27 15:52:01 -07:00
last_address_id : None ,
last_server_id : None ,
2022-08-25 06:40:56 -07:00
pool_name : pool_name . clone ( ) ,
username : username . clone ( ) ,
2022-09-01 22:06:55 -05:00
application_name : application_name . to_string ( ) ,
2022-08-25 06:40:56 -07:00
shutdown ,
2022-08-22 11:52:34 -07:00
connected_to_server : false ,
2022-11-10 02:04:31 +08:00
} )
2022-02-03 13:35:40 -08:00
}
2022-02-03 13:54:07 -08:00
2022-06-27 16:45:41 -07:00
/// Handle cancel request.
pub async fn cancel (
read : S ,
write : T ,
addr : std ::net ::SocketAddr ,
mut bytes : BytesMut , // The rest of the startup message.
client_server_map : ClientServerMap ,
2022-08-25 06:40:56 -07:00
shutdown : Receiver < ( ) > ,
2022-06-27 16:45:41 -07:00
) -> Result < Client < S , T > , Error > {
let process_id = bytes . get_i32 ( ) ;
let secret_key = bytes . get_i32 ( ) ;
2022-11-10 02:04:31 +08:00
Ok ( Client {
2022-06-27 16:45:41 -07:00
read : BufReader ::new ( read ) ,
2022-11-10 02:04:31 +08:00
write ,
2022-06-27 16:45:41 -07:00
addr ,
buffer : BytesMut ::with_capacity ( 8196 ) ,
cancel_mode : true ,
2022-07-27 21:47:55 -05:00
transaction_mode : false ,
2022-08-25 06:40:56 -07:00
process_id ,
secret_key ,
client_server_map ,
2022-06-27 16:45:41 -07:00
parameters : HashMap ::new ( ) ,
2022-07-27 21:47:55 -05:00
stats : get_reporter ( ) ,
2022-06-27 16:45:41 -07:00
admin : false ,
last_address_id : None ,
last_server_id : None ,
2022-08-25 06:40:56 -07:00
pool_name : String ::from ( " undefined " ) ,
username : String ::from ( " undefined " ) ,
2022-09-01 22:06:55 -05:00
application_name : String ::from ( " undefined " ) ,
2022-08-25 06:40:56 -07:00
shutdown ,
2022-08-22 11:52:34 -07:00
connected_to_server : false ,
2022-11-10 02:04:31 +08:00
} )
2022-06-27 16:45:41 -07:00
}
2022-06-27 15:52:01 -07:00
2022-03-10 01:33:29 -08:00
/// Handle a connected and authenticated client.
2022-06-24 14:52:38 -07:00
pub async fn handle ( & mut self ) -> Result < ( ) , Error > {
2022-02-15 22:45:45 -08:00
// The client wants to cancel a query it has issued previously.
2022-02-04 09:28:52 -08:00
if self . cancel_mode {
2022-02-24 08:44:41 -08:00
trace! ( " Sending CancelRequest " ) ;
2022-02-22 19:26:08 -08:00
2022-02-05 10:02:13 -08:00
let ( process_id , secret_key , address , port ) = {
2022-02-24 08:44:41 -08:00
let guard = self . client_server_map . lock ( ) ;
2022-02-15 22:45:45 -08:00
2022-02-04 16:01:35 -08:00
match guard . get ( & ( self . process_id , self . secret_key ) ) {
2022-02-06 10:48:14 -08:00
// Drop the mutex as soon as possible.
2022-02-15 22:45:45 -08:00
// We found the server the client is using for its query
// that it wants to cancel.
2022-11-10 02:04:31 +08:00
Some ( ( process_id , secret_key , address , port ) ) = > {
( * process_id , * secret_key , address . clone ( ) , * port )
}
2022-02-15 22:45:45 -08:00
// The client doesn't know / got the wrong server,
// we're closing the connection for security reasons.
2022-02-04 16:01:35 -08:00
None = > return Ok ( ( ) ) ,
}
} ;
2022-02-15 22:45:45 -08:00
// Opens a new separate connection to the server, sends the backend_id
// and secret_key and then closes it for security reasons. No other interactions
// take place.
2022-11-10 02:04:31 +08:00
return Server ::cancel ( & address , port , process_id , secret_key ) . await ;
2022-02-04 09:28:52 -08:00
}
2022-06-24 14:52:38 -07:00
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
2022-08-09 14:18:27 -05:00
let mut query_router = QueryRouter ::new ( ) ;
2022-09-14 10:20:41 -05:00
self . stats . client_register (
self . process_id ,
self . pool_name . clone ( ) ,
self . username . clone ( ) ,
self . application_name . clone ( ) ,
) ;
2022-02-19 13:57:35 -08:00
2022-02-16 22:52:11 -08:00
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
2022-03-10 01:33:29 -08:00
// or issue commands for our sharding and server selection protocol.
2022-02-03 13:54:07 -08:00
loop {
2022-06-24 14:52:38 -07:00
trace! (
" Client idle, waiting for message, transaction mode: {} " ,
self . transaction_mode
) ;
2022-02-22 19:26:08 -08:00
2022-02-06 10:48:14 -08:00
// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool,
2022-03-10 01:33:29 -08:00
// in case the client is sending some custom protocol messages, e.g.
2022-02-09 06:51:31 -08:00
// SET SHARDING KEY TO 'bigint';
2022-08-08 19:01:24 -04:00
2022-10-13 22:33:12 -04:00
let message = tokio ::select! {
2022-08-25 06:40:56 -07:00
_ = self . shutdown . recv ( ) = > {
if ! self . admin {
error_response_terminal (
& mut self . write ,
2022-11-10 02:04:31 +08:00
" terminating connection due to administrator command "
2022-08-25 06:40:56 -07:00
) . await ? ;
return Ok ( ( ) )
}
// Admin clients ignore shutdown.
else {
read_message ( & mut self . read ) . await ?
}
2022-08-08 19:01:24 -04:00
} ,
message_result = read_message ( & mut self . read ) = > message_result ?
} ;
2022-02-06 10:48:14 -08:00
2022-09-01 17:02:39 -05:00
match message [ 0 ] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
// we'll be able to allocate a connection. Also, clients do not expect
// the server to respond to these messages so even if we were not able to
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'P' | 'B' | 'D' | 'E' = > {
self . buffer . put ( & message [ .. ] ) ;
continue ;
}
'X' = > {
debug! ( " Client disconnecting " ) ;
return Ok ( ( ) ) ;
}
_ = > ( ) ,
}
2022-09-07 00:22:31 -05:00
// Handle admin database queries.
if self . admin {
debug! ( " Handling admin command " ) ;
handle_admin ( & mut self . write , message , self . client_server_map . clone ( ) ) . await ? ;
continue ;
}
2022-08-09 14:18:27 -05:00
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
2023-01-28 15:36:35 -08:00
let mut pool = self . get_pool ( ) . await ? ;
// Check if the pool is paused and wait until it's resumed.
if pool . wait_paused ( ) . await {
// Refresh pool information, something might have changed.
pool = self . get_pool ( ) . await ? ;
}
2022-09-23 11:08:38 -05:00
2022-08-09 14:18:27 -05:00
query_router . update_pool_settings ( pool . settings . clone ( ) ) ;
2023-01-28 15:36:35 -08:00
2022-06-24 14:52:38 -07:00
let current_shard = query_router . shard ( ) ;
2022-03-10 01:33:29 -08:00
// Handle all custom protocol commands, if any.
2023-01-19 10:19:49 -05:00
match query_router . try_execute_command ( & message ) {
2022-03-10 01:33:29 -08:00
// Normal query, not a custom command.
2022-06-24 15:14:31 -07:00
None = > {
if query_router . query_parser_enabled ( ) {
2023-01-19 10:19:49 -05:00
query_router . infer ( & message ) ;
2022-06-24 15:14:31 -07:00
}
}
2022-02-19 08:57:24 -08:00
2022-03-10 01:33:29 -08:00
// SET SHARD TO
2022-02-24 12:16:24 -08:00
Some ( ( Command ::SetShard , _ ) ) = > {
2022-06-24 14:52:38 -07:00
// Selected shard is not configured.
if query_router . shard ( ) > = pool . shards ( ) {
// Set the shard back to what it was.
query_router . set_shard ( current_shard ) ;
error_response (
& mut self . write ,
& format! (
" shard {} is more than configured {}, staying on shard {} " ,
query_router . shard ( ) ,
pool . shards ( ) ,
current_shard ,
) ,
)
. await ? ;
} else {
custom_protocol_response_ok ( & mut self . write , " SET SHARD " ) . await ? ;
}
continue ;
}
// SET PRIMARY READS TO
Some ( ( Command ::SetPrimaryReads , _ ) ) = > {
custom_protocol_response_ok ( & mut self . write , " SET PRIMARY READS " ) . await ? ;
2022-02-19 08:57:24 -08:00
continue ;
}
2022-03-10 01:33:29 -08:00
// SET SHARDING KEY TO
2022-02-24 12:16:24 -08:00
Some ( ( Command ::SetShardingKey , _ ) ) = > {
2022-03-10 01:33:29 -08:00
custom_protocol_response_ok ( & mut self . write , " SET SHARDING KEY " ) . await ? ;
2022-02-24 12:16:24 -08:00
continue ;
}
2022-03-10 01:33:29 -08:00
// SET SERVER ROLE TO
2022-02-19 08:57:24 -08:00
Some ( ( Command ::SetServerRole , _ ) ) = > {
custom_protocol_response_ok ( & mut self . write , " SET SERVER ROLE " ) . await ? ;
continue ;
}
2022-03-10 01:33:29 -08:00
// SHOW SERVER ROLE
2022-02-19 08:57:24 -08:00
Some ( ( Command ::ShowServerRole , value ) ) = > {
show_response ( & mut self . write , " server role " , & value ) . await ? ;
continue ;
}
2022-03-10 01:33:29 -08:00
// SHOW SHARD
2022-02-19 08:57:24 -08:00
Some ( ( Command ::ShowShard , value ) ) = > {
show_response ( & mut self . write , " shard " , & value ) . await ? ;
continue ;
}
2022-06-24 14:52:38 -07:00
// SHOW PRIMARY READS
Some ( ( Command ::ShowPrimaryReads , value ) ) = > {
show_response ( & mut self . write , " primary reads " , & value ) . await ? ;
continue ;
}
} ;
2022-02-06 10:48:14 -08:00
2022-02-22 19:26:08 -08:00
debug! ( " Waiting for connection from pool " ) ;
2022-03-10 01:33:29 -08:00
// Grab a server from the pool.
2022-03-04 17:04:27 -08:00
let connection = match pool
2022-08-22 00:15:20 -05:00
. get ( query_router . shard ( ) , query_router . role ( ) , self . process_id )
2022-03-04 17:04:27 -08:00
. await
{
2022-02-22 19:26:08 -08:00
Ok ( conn ) = > {
debug! ( " Got connection from pool " ) ;
conn
}
2022-02-09 21:19:14 -08:00
Err ( err ) = > {
2022-09-01 17:02:39 -05:00
// Client is attempting to get results from the server,
// but we were unable to grab a connection from the pool
// We'll send back an error message and clean the extended
// protocol buffer
if message [ 0 ] as char = = 'S' {
error! ( " Got Sync message but failed to get a connection from the pool " ) ;
self . buffer . clear ( ) ;
}
error_response ( & mut self . write , " could not get connection from the pool " )
. await ? ;
2022-09-07 10:24:07 -05:00
error! ( " Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \" {:?} \" , error: \" {:?} \" }} " ,
self . pool_name . clone ( ) , self . username . clone ( ) , query_router . shard ( ) , query_router . role ( ) , err ) ;
2022-02-16 22:52:11 -08:00
continue ;
2022-02-09 21:19:14 -08:00
}
} ;
2022-02-15 22:45:45 -08:00
let mut reference = connection . 0 ;
2022-03-04 17:04:27 -08:00
let address = connection . 1 ;
2022-02-15 22:45:45 -08:00
let server = & mut * reference ;
2022-02-03 16:25:05 -08:00
2022-03-10 01:33:29 -08:00
// Server is assigned to the client in case the client wants to
// cancel a query later.
2022-02-04 16:01:35 -08:00
server . claim ( self . process_id , self . secret_key ) ;
2022-08-22 11:52:34 -07:00
self . connected_to_server = true ;
2022-02-03 17:06:19 -08:00
2022-09-14 10:20:41 -05:00
// Update statistics
self . stats
. client_active ( self . process_id , server . server_id ( ) ) ;
2022-03-10 01:33:29 -08:00
2022-03-04 17:04:27 -08:00
self . last_address_id = Some ( address . id ) ;
2022-09-14 10:20:41 -05:00
self . last_server_id = Some ( server . server_id ( ) ) ;
2022-02-20 12:40:09 -08:00
2022-02-22 19:26:08 -08:00
debug! (
2022-06-27 16:45:41 -07:00
" Client {:?} talking to server {:?} " ,
self . addr ,
2022-02-22 19:26:08 -08:00
server . address ( )
) ;
2022-06-03 00:15:50 -07:00
// TODO: investigate other parameters and set them too.
2022-09-01 22:06:55 -05:00
// Set application_name.
server . set_name ( & self . application_name ) . await ? ;
2022-06-03 00:15:50 -07:00
2022-10-13 22:33:12 -04:00
let mut initial_message = Some ( message ) ;
2022-02-15 22:45:45 -08:00
// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
// or until the client disconnects if we are in session mode.
2022-03-10 01:33:29 -08:00
//
// If the client is in session mode, no more custom protocol
// commands will be accepted.
2022-02-03 16:25:05 -08:00
loop {
2022-10-13 22:33:12 -04:00
let message = match initial_message {
None = > {
trace! ( " Waiting for message inside transaction or in session mode " ) ;
2022-02-22 19:26:08 -08:00
2022-10-13 22:33:12 -04:00
match read_message ( & mut self . read ) . await {
Ok ( message ) = > message ,
Err ( err ) = > {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
server . checkin_cleanup ( ) . await ? ;
2022-02-03 17:06:19 -08:00
2022-10-13 22:33:12 -04:00
return Err ( err ) ;
}
2022-02-06 10:48:14 -08:00
}
2022-02-03 17:06:19 -08:00
}
2022-10-13 22:33:12 -04:00
Some ( message ) = > {
initial_message = None ;
message
}
2022-02-03 17:06:19 -08:00
} ;
2022-02-15 22:45:45 -08:00
// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
2022-10-13 22:33:12 -04:00
// Safe to unwrap because we know this message has a certain length and has the code
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = * message . get ( 0 ) . unwrap ( ) as char ;
2022-02-03 16:25:05 -08:00
2022-02-24 08:44:41 -08:00
trace! ( " Message: {} " , code ) ;
2022-02-22 19:26:08 -08:00
2022-02-03 16:25:05 -08:00
match code {
2022-10-13 22:33:12 -04:00
// Query
2022-02-03 16:25:05 -08:00
'Q' = > {
2022-02-22 19:26:08 -08:00
debug! ( " Sending query to server " ) ;
2023-01-17 20:39:55 -05:00
self . send_and_receive_loop ( code , Some ( & message ) , server , & address , & pool )
2022-08-25 06:40:56 -07:00
. await ? ;
2022-02-04 08:26:50 -08:00
2022-02-14 10:00:55 -08:00
if ! server . in_transaction ( ) {
2022-02-15 22:45:45 -08:00
// Report transaction executed statistics.
2022-09-14 10:20:41 -05:00
self . stats . transaction ( self . process_id , server . server_id ( ) ) ;
2022-02-14 10:00:55 -08:00
2022-02-15 22:45:45 -08:00
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
2022-02-14 10:00:55 -08:00
if self . transaction_mode {
break ;
}
2022-02-03 16:25:05 -08:00
}
}
2022-02-15 22:45:45 -08:00
// Terminate
2022-02-03 16:25:05 -08:00
'X' = > {
2022-09-01 22:06:55 -05:00
server . checkin_cleanup ( ) . await ? ;
2022-05-18 16:24:03 -07:00
self . release ( ) ;
2022-02-03 16:25:05 -08:00
return Ok ( ( ) ) ;
}
2022-02-15 22:45:45 -08:00
// Parse
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
2022-02-03 16:25:05 -08:00
'P' = > {
2022-10-13 22:33:12 -04:00
self . buffer . put ( & message [ .. ] ) ;
2022-02-03 16:25:05 -08:00
}
2022-02-15 22:45:45 -08:00
// Bind
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
2022-02-03 16:25:05 -08:00
'B' = > {
2022-10-13 22:33:12 -04:00
self . buffer . put ( & message [ .. ] ) ;
2022-02-03 16:25:05 -08:00
}
2022-02-06 10:48:14 -08:00
// Describe
2022-02-15 22:45:45 -08:00
// Command a client can issue to describe a previously prepared named statement.
2022-02-03 16:25:05 -08:00
'D' = > {
2022-10-13 22:33:12 -04:00
self . buffer . put ( & message [ .. ] ) ;
2022-02-03 16:25:05 -08:00
}
2022-02-15 22:45:45 -08:00
// Execute
// Execute a prepared statement prepared in `P` and bound in `B`.
2022-02-03 16:25:05 -08:00
'E' = > {
2022-10-13 22:33:12 -04:00
self . buffer . put ( & message [ .. ] ) ;
2022-02-03 16:25:05 -08:00
}
2022-02-15 22:45:45 -08:00
// Sync
// Frontend (client) is asking for the query result now.
2022-02-03 16:25:05 -08:00
'S' = > {
2022-02-22 19:26:08 -08:00
debug! ( " Sending query to server " ) ;
2022-10-13 22:33:12 -04:00
self . buffer . put ( & message [ .. ] ) ;
2022-02-09 21:19:14 -08:00
2022-09-07 22:37:17 -05:00
let first_message_code = ( * self . buffer . get ( 0 ) . unwrap_or ( & 0 ) ) as char ;
// Almost certainly true
if first_message_code = = 'P' {
// Message layout
// P followed by 32 int followed by null-terminated statement name
// So message code should be in offset 0 of the buffer, first character
// in prepared statement name would be index 5
let first_char_in_name = * self . buffer . get ( 5 ) . unwrap_or ( & 0 ) ;
if first_char_in_name ! = 0 {
// This is a named prepared statement
// Server connection state will need to be cleared at checkin
server . mark_dirty ( ) ;
}
}
2023-01-17 20:39:55 -05:00
self . send_and_receive_loop ( code , None , server , & address , & pool )
. await ? ;
2022-02-15 22:45:45 -08:00
2022-02-03 16:25:05 -08:00
self . buffer . clear ( ) ;
2022-02-14 10:00:55 -08:00
if ! server . in_transaction ( ) {
2022-09-14 10:20:41 -05:00
self . stats . transaction ( self . process_id , server . server_id ( ) ) ;
2022-02-14 10:00:55 -08:00
2022-03-10 01:33:29 -08:00
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
2022-02-14 10:00:55 -08:00
if self . transaction_mode {
break ;
}
2022-02-03 16:25:05 -08:00
}
}
2022-02-04 08:06:45 -08:00
// CopyData
'd' = > {
2023-01-17 20:39:55 -05:00
self . buffer . put ( & message [ .. ] ) ;
// Want to limit buffer size
if self . buffer . len ( ) > 8196 {
// Forward the data to the server,
self . send_server_message ( server , & self . buffer , & address , & pool )
. await ? ;
self . buffer . clear ( ) ;
}
2022-02-04 08:06:45 -08:00
}
2022-02-15 22:45:45 -08:00
// CopyDone or CopyFail
// Copy is done, successfully or not.
2022-02-04 08:06:45 -08:00
'c' | 'f' = > {
2023-01-17 20:39:55 -05:00
// We may already have some copy data in the buffer, add this message to buffer
self . buffer . put ( & message [ .. ] ) ;
self . send_server_message ( server , & self . buffer , & address , & pool )
2022-08-11 17:42:40 -04:00
. await ? ;
2022-02-15 22:45:45 -08:00
2023-01-17 20:39:55 -05:00
// Clear the buffer
self . buffer . clear ( ) ;
2022-08-25 06:40:56 -07:00
let response = self . receive_server_message ( server , & address , & pool ) . await ? ;
2023-01-17 20:39:55 -05:00
match write_all_half ( & mut self . write , & response ) . await {
2022-02-04 08:06:45 -08:00
Ok ( _ ) = > ( ) ,
Err ( err ) = > {
server . mark_bad ( ) ;
return Err ( err ) ;
}
} ;
2022-02-05 15:23:21 -08:00
2022-02-14 10:00:55 -08:00
if ! server . in_transaction ( ) {
2022-09-14 10:20:41 -05:00
self . stats . transaction ( self . process_id , server . server_id ( ) ) ;
2022-02-14 10:00:55 -08:00
2022-03-10 01:33:29 -08:00
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
2022-02-14 10:00:55 -08:00
if self . transaction_mode {
break ;
}
2022-02-05 15:23:21 -08:00
}
2022-02-04 08:06:45 -08:00
}
2022-02-15 22:45:45 -08:00
// Some unexpected message. We either did not implement the protocol correctly
// or this is not a Postgres client we're talking to.
2022-02-03 16:25:05 -08:00
_ = > {
2022-02-20 22:47:08 -08:00
error! ( " Unexpected code: {} " , code ) ;
2022-02-03 16:25:05 -08:00
}
2022-02-03 15:17:04 -08:00
}
2022-02-03 13:54:07 -08:00
}
2022-02-04 16:01:35 -08:00
2022-02-15 22:45:45 -08:00
// The server is no longer bound to us, we can't cancel it's queries anymore.
2022-02-22 19:26:08 -08:00
debug! ( " Releasing server back into the pool " ) ;
2022-09-01 22:06:55 -05:00
server . checkin_cleanup ( ) . await ? ;
2022-09-14 10:20:41 -05:00
self . stats . server_idle ( server . server_id ( ) ) ;
2022-08-22 11:52:34 -07:00
self . connected_to_server = false ;
2022-09-01 22:06:55 -05:00
2022-02-04 16:01:35 -08:00
self . release ( ) ;
2022-09-14 10:20:41 -05:00
self . stats . client_idle ( self . process_id ) ;
2022-02-03 13:54:07 -08:00
}
}
2022-02-04 16:01:35 -08:00
2023-01-28 15:36:35 -08:00
/// Retrieve connection pool, if it exists.
/// Return an error to the client otherwise.
async fn get_pool ( & mut self ) -> Result < ConnectionPool , Error > {
match get_pool ( & self . pool_name , & self . username ) {
Some ( pool ) = > Ok ( pool ) ,
None = > {
error_response (
& mut self . write ,
& format! (
" No pool configured for database: {}, user: {} " ,
self . pool_name , self . username
) ,
)
. await ? ;
Err ( Error ::ClientError ( format! (
" Invalid pool name {{ username: {}, pool_name: {}, application_name: {} }} " ,
self . pool_name , self . username , self . application_name
) ) )
}
}
}
2022-03-10 01:33:29 -08:00
/// Release the server from the client: it can't cancel its queries anymore.
2022-02-11 11:19:40 -08:00
pub fn release ( & self ) {
2022-02-24 08:44:41 -08:00
let mut guard = self . client_server_map . lock ( ) ;
2022-02-04 16:01:35 -08:00
guard . remove ( & ( self . process_id , self . secret_key ) ) ;
}
2022-08-11 17:42:40 -04:00
2022-08-18 08:12:38 -04:00
async fn send_and_receive_loop (
& mut self ,
code : char ,
2023-01-17 20:39:55 -05:00
message : Option < & BytesMut > ,
2022-08-18 08:12:38 -04:00
server : & mut Server ,
address : & Address ,
pool : & ConnectionPool ,
) -> Result < ( ) , Error > {
debug! ( " Sending {} to server " , code ) ;
2023-01-17 20:39:55 -05:00
let message = match message {
Some ( message ) = > message ,
None = > & self . buffer ,
} ;
2022-11-10 02:04:31 +08:00
self . send_server_message ( server , message , address , pool )
2022-08-18 08:12:38 -04:00
. await ? ;
2022-09-15 01:21:45 -05:00
let query_start = Instant ::now ( ) ;
2022-08-18 08:12:38 -04:00
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
2022-11-10 02:04:31 +08:00
let response = self . receive_server_message ( server , address , pool ) . await ? ;
2022-08-18 08:12:38 -04:00
2023-01-17 20:39:55 -05:00
match write_all_half ( & mut self . write , & response ) . await {
2022-08-18 08:12:38 -04:00
Ok ( _ ) = > ( ) ,
Err ( err ) = > {
server . mark_bad ( ) ;
return Err ( err ) ;
}
} ;
if ! server . is_data_available ( ) {
break ;
}
}
// Report query executed statistics.
2022-09-15 01:21:45 -05:00
self . stats . query (
self . process_id ,
server . server_id ( ) ,
Instant ::now ( ) . duration_since ( query_start ) . as_millis ( ) ,
) ;
2022-08-18 08:12:38 -04:00
Ok ( ( ) )
}
2022-08-11 17:42:40 -04:00
async fn send_server_message (
& self ,
server : & mut Server ,
2023-01-17 20:39:55 -05:00
message : & BytesMut ,
2022-08-11 17:42:40 -04:00
address : & Address ,
pool : & ConnectionPool ,
) -> Result < ( ) , Error > {
match server . send ( message ) . await {
Ok ( _ ) = > Ok ( ( ) ) ,
Err ( err ) = > {
2022-08-25 06:40:56 -07:00
pool . ban ( address , self . process_id ) ;
2022-08-11 17:42:40 -04:00
Err ( err )
}
}
}
async fn receive_server_message (
2022-08-13 13:45:58 -07:00
& mut self ,
2022-08-11 17:42:40 -04:00
server : & mut Server ,
address : & Address ,
pool : & ConnectionPool ,
) -> Result < BytesMut , Error > {
2022-08-13 13:45:58 -07:00
if pool . settings . user . statement_timeout > 0 {
match tokio ::time ::timeout (
tokio ::time ::Duration ::from_millis ( pool . settings . user . statement_timeout ) ,
server . recv ( ) ,
)
. await
{
Ok ( result ) = > match result {
Ok ( message ) = > Ok ( message ) ,
Err ( err ) = > {
2022-08-25 06:40:56 -07:00
pool . ban ( address , self . process_id ) ;
2022-08-13 13:45:58 -07:00
error_response_terminal (
& mut self . write ,
& format! ( " error receiving data from server: {:?} " , err ) ,
)
. await ? ;
Err ( err )
}
} ,
Err ( _ ) = > {
error! (
" Statement timeout while talking to {:?} with user {} " ,
address , pool . settings . user . username
) ;
server . mark_bad ( ) ;
2022-08-25 06:40:56 -07:00
pool . ban ( address , self . process_id ) ;
2022-08-13 13:45:58 -07:00
error_response_terminal ( & mut self . write , " pool statement timeout " ) . await ? ;
Err ( Error ::StatementTimeout )
}
}
} else {
match server . recv ( ) . await {
Ok ( message ) = > Ok ( message ) ,
Err ( err ) = > {
2022-08-25 06:40:56 -07:00
pool . ban ( address , self . process_id ) ;
2022-08-13 13:45:58 -07:00
error_response_terminal (
& mut self . write ,
& format! ( " error receiving data from server: {:?} " , err ) ,
)
. await ? ;
Err ( err )
}
2022-08-11 17:42:40 -04:00
}
}
}
2022-02-03 15:17:04 -08:00
}
2022-02-20 12:40:09 -08:00
2022-06-27 15:52:01 -07:00
impl < S , T > Drop for Client < S , T > {
2022-02-20 12:40:09 -08:00
fn drop ( & mut self ) {
2022-06-27 16:45:41 -07:00
let mut guard = self . client_server_map . lock ( ) ;
guard . remove ( & ( self . process_id , self . secret_key ) ) ;
2022-08-22 11:52:34 -07:00
// Dirty shutdown
// TODO: refactor, this is not the best way to handle state management.
2022-09-14 10:20:41 -05:00
self . stats . client_disconnecting ( self . process_id ) ;
if self . connected_to_server & & self . last_server_id . is_some ( ) {
self . stats . server_idle ( self . last_server_id . unwrap ( ) ) ;
2022-03-04 17:04:27 -08:00
}
2022-02-20 12:40:09 -08:00
}
}