2022-03-10 01:33:29 -08:00
/// Implementation of the PostgreSQL server (database) protocol.
/// Here we are pretending to the a Postgres client.
2022-02-20 22:47:08 -08:00
use bytes ::{ Buf , BufMut , BytesMut } ;
2022-09-01 22:06:55 -05:00
use log ::{ debug , error , info , trace , warn } ;
use std ::io ::Read ;
2023-03-28 17:19:37 +02:00
use std ::sync ::Arc ;
2022-08-11 17:42:40 -04:00
use std ::time ::SystemTime ;
2022-02-03 15:17:04 -08:00
use tokio ::io ::{ AsyncReadExt , BufReader } ;
2022-02-15 22:45:45 -08:00
use tokio ::net ::{
tcp ::{ OwnedReadHalf , OwnedWriteHalf } ,
TcpStream ,
} ;
2022-02-03 15:17:04 -08:00
2022-02-15 08:18:01 -08:00
use crate ::config ::{ Address , User } ;
2022-02-15 22:45:45 -08:00
use crate ::constants ::* ;
2022-02-03 15:17:04 -08:00
use crate ::errors ::Error ;
use crate ::messages ::* ;
2023-03-10 06:23:51 -06:00
use crate ::mirrors ::MirroringManager ;
2022-09-20 21:47:32 -04:00
use crate ::pool ::ClientServerMap ;
2022-06-18 18:36:00 -07:00
use crate ::scram ::ScramSha256 ;
2023-03-28 17:19:37 +02:00
use crate ::stats ::ServerStats ;
2022-02-03 15:17:04 -08:00
2022-02-04 09:28:52 -08:00
/// Server state.
2022-02-03 15:17:04 -08:00
pub struct Server {
2022-03-10 01:33:29 -08:00
/// Server host, e.g. localhost,
/// port, e.g. 5432, and role, e.g. primary or replica.
2022-02-15 08:18:01 -08:00
address : Address ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Buffered read socket.
2022-02-03 15:17:04 -08:00
read : BufReader < OwnedReadHalf > ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Unbuffered write socket (our client code buffers).
2022-02-03 15:17:04 -08:00
write : OwnedWriteHalf ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Our server response buffer. We buffer data before we give it to the client.
2022-02-03 15:17:04 -08:00
buffer : BytesMut ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Server information the server sent us over on startup.
2022-02-03 17:48:37 -08:00
server_info : BytesMut ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Backend id and secret key used for query cancellation.
2022-02-20 22:47:08 -08:00
process_id : i32 ,
2022-02-03 17:48:37 -08:00
secret_key : i32 ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Is the server inside a transaction or idle.
2022-02-03 16:25:05 -08:00
in_transaction : bool ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Is there more data for the client to read.
2022-02-04 08:26:50 -08:00
data_available : bool ,
2022-02-08 09:25:59 -08:00
2022-03-10 01:33:29 -08:00
/// Is the server broken? We'll remote it from the pool if so.
2022-02-03 17:06:19 -08:00
bad : bool ,
2022-02-08 09:25:59 -08:00
2022-09-01 22:06:55 -05:00
/// If server connection requires a DISCARD ALL before checkin
needs_cleanup : bool ,
2022-03-10 01:33:29 -08:00
/// Mapping of clients and servers used for query cancellation.
2022-02-04 16:01:35 -08:00
client_server_map : ClientServerMap ,
2022-02-09 20:02:20 -08:00
2022-03-10 01:33:29 -08:00
/// Server connected at.
2022-02-12 10:16:05 -08:00
connected_at : chrono ::naive ::NaiveDateTime ,
2022-02-14 10:00:55 -08:00
2022-03-10 01:33:29 -08:00
/// Reports various metrics, e.g. data sent & received.
2023-03-28 17:19:37 +02:00
stats : Arc < ServerStats > ,
2022-06-05 09:48:06 -07:00
/// Application name using the server at the moment.
application_name : String ,
2022-08-11 17:42:40 -04:00
// Last time that a successful server send or response happened
last_activity : SystemTime ,
2023-03-10 06:23:51 -06:00
mirror_manager : Option < MirroringManager > ,
2022-02-03 15:17:04 -08:00
}
impl Server {
2022-02-04 09:28:52 -08:00
/// Pretend to be the Postgres client and connect to the server given host, port and credentials.
2022-02-15 22:45:45 -08:00
/// Perform the authentication and return the server in a ready for query state.
2022-02-03 15:17:04 -08:00
pub async fn startup (
2022-02-15 08:18:01 -08:00
address : & Address ,
user : & User ,
2022-02-03 15:17:04 -08:00
database : & str ,
2022-02-04 16:01:35 -08:00
client_server_map : ClientServerMap ,
2023-03-28 17:19:37 +02:00
stats : Arc < ServerStats > ,
2022-02-03 15:17:04 -08:00
) -> Result < Server , Error > {
2022-02-15 08:18:01 -08:00
let mut stream =
2022-08-25 06:40:56 -07:00
match TcpStream ::connect ( & format! ( " {} : {} " , & address . host , address . port ) ) . await {
2022-02-15 08:18:01 -08:00
Ok ( stream ) = > stream ,
Err ( err ) = > {
2022-02-20 22:47:08 -08:00
error! ( " Could not connect to server: {} " , err ) ;
2022-11-17 09:24:39 -08:00
return Err ( Error ::SocketError ( format! (
" Could not connect to server: {} " ,
err
) ) ) ;
2022-02-15 08:18:01 -08:00
}
} ;
2023-02-08 11:35:38 -06:00
configure_socket ( & stream ) ;
2022-02-03 15:17:04 -08:00
2022-02-24 08:44:41 -08:00
trace! ( " Sending StartupMessage " ) ;
2022-02-22 19:26:08 -08:00
2022-03-10 01:33:29 -08:00
// StartupMessage
2022-07-27 21:47:55 -05:00
startup ( & mut stream , & user . username , database ) . await ? ;
2022-02-03 15:17:04 -08:00
2022-02-15 22:45:45 -08:00
let mut server_info = BytesMut ::new ( ) ;
2022-02-20 22:47:08 -08:00
let mut process_id : i32 = 0 ;
2022-02-03 17:48:37 -08:00
let mut secret_key : i32 = 0 ;
2022-02-15 22:45:45 -08:00
// We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete.
2022-06-18 18:36:00 -07:00
let mut scram = ScramSha256 ::new ( & user . password ) ;
2022-02-03 15:17:04 -08:00
loop {
let code = match stream . read_u8 ( ) . await {
Ok ( code ) = > code as char ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading message code on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
let len = match stream . read_i32 ( ) . await {
Ok ( len ) = > len ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading message len on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
2022-02-24 08:44:41 -08:00
trace! ( " Message: {} " , code ) ;
2022-02-22 19:26:08 -08:00
2022-02-03 15:17:04 -08:00
match code {
2022-02-15 22:45:45 -08:00
// Authentication
2022-02-03 15:17:04 -08:00
'R' = > {
2022-02-15 22:45:45 -08:00
// Determine which kind of authentication is required, if any.
let auth_code = match stream . read_i32 ( ) . await {
Ok ( auth_code ) = > auth_code ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading auth code on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
2022-02-24 08:44:41 -08:00
trace! ( " Auth: {} " , auth_code ) ;
2022-02-22 19:26:08 -08:00
2022-02-15 22:45:45 -08:00
match auth_code {
MD5_ENCRYPTED_PASSWORD = > {
// The salt is 4 bytes.
// See: https://www.postgresql.org/docs/12/protocol-message-formats.html
2022-02-03 15:17:04 -08:00
let mut salt = vec! [ 0 u8 ; 4 ] ;
match stream . read_exact ( & mut salt ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading salt on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
2022-07-27 21:47:55 -05:00
md5_password ( & mut stream , & user . username , & user . password , & salt [ .. ] )
2022-02-15 08:18:01 -08:00
. await ? ;
2022-02-03 15:17:04 -08:00
}
2022-02-15 22:45:45 -08:00
AUTHENTICATION_SUCCESSFUL = > ( ) ,
2022-02-03 15:17:04 -08:00
2022-06-18 18:36:00 -07:00
SASL = > {
debug! ( " Starting SASL authentication " ) ;
let sasl_len = ( len - 8 ) as usize ;
let mut sasl_auth = vec! [ 0 u8 ; sasl_len ] ;
2022-06-20 06:15:54 -07:00
2022-06-18 18:36:00 -07:00
match stream . read_exact ( & mut sasl_auth ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading sasl message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-06-18 18:36:00 -07:00
} ;
let sasl_type = String ::from_utf8_lossy ( & sasl_auth [ .. sasl_len - 2 ] ) ;
if sasl_type = = SCRAM_SHA_256 {
debug! ( " Using {} " , SCRAM_SHA_256 ) ;
2022-06-20 06:15:54 -07:00
// Generate client message.
2022-06-18 18:36:00 -07:00
let sasl_response = scram . message ( ) ;
2022-06-20 06:15:54 -07:00
// SASLInitialResponse (F)
2022-06-18 18:36:00 -07:00
let mut res = BytesMut ::new ( ) ;
res . put_u8 ( b 'p' ) ;
2022-06-20 06:15:54 -07:00
// length + String length + length + length of sasl response
2022-06-18 18:36:00 -07:00
res . put_i32 (
2022-06-20 06:15:54 -07:00
4 // i32 size
+ SCRAM_SHA_256 . len ( ) as i32 // length of SASL version string,
+ 1 // Null terminator for the SASL version string,
+ 4 // i32 size
+ sasl_response . len ( ) as i32 , // length of SASL response
2022-06-18 18:36:00 -07:00
) ;
2022-06-20 06:15:54 -07:00
2022-11-10 02:04:31 +08:00
res . put_slice ( format! ( " {} \0 " , SCRAM_SHA_256 ) . as_bytes ( ) ) ;
2022-06-18 18:36:00 -07:00
res . put_i32 ( sasl_response . len ( ) as i32 ) ;
res . put ( sasl_response ) ;
write_all ( & mut stream , res ) . await ? ;
} else {
error! ( " Unsupported SCRAM version: {} " , sasl_type ) ;
return Err ( Error ::ServerError ) ;
}
}
SASL_CONTINUE = > {
trace! ( " Continuing SASL " ) ;
let mut sasl_data = vec! [ 0 u8 ; ( len - 8 ) as usize ] ;
match stream . read_exact ( & mut sasl_data ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading sasl cont message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-06-18 18:36:00 -07:00
} ;
let msg = BytesMut ::from ( & sasl_data [ .. ] ) ;
let sasl_response = scram . update ( & msg ) ? ;
2022-06-20 06:15:54 -07:00
// SASLResponse
2022-06-18 18:36:00 -07:00
let mut res = BytesMut ::new ( ) ;
res . put_u8 ( b 'p' ) ;
res . put_i32 ( 4 + sasl_response . len ( ) as i32 ) ;
res . put ( sasl_response ) ;
write_all ( & mut stream , res ) . await ? ;
}
SASL_FINAL = > {
trace! ( " Final SASL " ) ;
let mut sasl_final = vec! [ 0 u8 ; len as usize - 8 ] ;
match stream . read_exact ( & mut sasl_final ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading sasl final message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-06-18 18:36:00 -07:00
} ;
match scram . finish ( & BytesMut ::from ( & sasl_final [ .. ] ) ) {
Ok ( _ ) = > {
debug! ( " SASL authentication successful " ) ;
}
Err ( err ) = > {
debug! ( " SASL authentication failed " ) ;
return Err ( err ) ;
}
} ;
}
2022-02-03 15:17:04 -08:00
_ = > {
2022-02-20 22:47:08 -08:00
error! ( " Unsupported authentication mechanism: {} " , auth_code ) ;
2022-02-03 15:17:04 -08:00
return Err ( Error ::ServerError ) ;
}
}
}
2022-02-15 22:45:45 -08:00
// ErrorResponse
2022-02-03 15:17:04 -08:00
'E' = > {
2022-02-07 11:15:33 -08:00
let error_code = match stream . read_u8 ( ) . await {
Ok ( error_code ) = > error_code ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading error code message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-07 11:15:33 -08:00
} ;
2022-02-24 08:44:41 -08:00
trace! ( " Error: {} " , error_code ) ;
2022-02-22 19:26:08 -08:00
2022-02-07 11:15:33 -08:00
match error_code {
2022-02-15 22:45:45 -08:00
// No error message is present in the message.
MESSAGE_TERMINATOR = > ( ) ,
// An error message will be present.
2022-02-07 11:15:33 -08:00
_ = > {
2022-02-15 22:45:45 -08:00
// Read the error message without the terminating null character.
2022-02-07 11:15:33 -08:00
let mut error = vec! [ 0 u8 ; len as usize - 4 - 1 ] ;
2022-02-15 22:45:45 -08:00
2022-02-07 11:15:33 -08:00
match stream . read_exact ( & mut error ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading error message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-07 11:15:33 -08:00
} ;
2022-02-15 22:45:45 -08:00
// TODO: the error message contains multiple fields; we can decode them and
// present a prettier message to the user.
// See: https://www.postgresql.org/docs/12/protocol-error-fields.html
2022-02-20 22:47:08 -08:00
error! ( " Server error: {} " , String ::from_utf8_lossy ( & error ) ) ;
2022-02-07 11:15:33 -08:00
}
} ;
2022-02-15 22:45:45 -08:00
2022-02-03 15:17:04 -08:00
return Err ( Error ::ServerError ) ;
}
2022-02-15 22:45:45 -08:00
// ParameterStatus
2022-02-03 15:17:04 -08:00
'S' = > {
let mut param = vec! [ 0 u8 ; len as usize - 4 ] ;
2022-02-15 22:45:45 -08:00
2022-02-03 15:17:04 -08:00
match stream . read_exact ( & mut param ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading parameter status message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
2022-02-03 17:48:37 -08:00
2022-02-15 22:45:45 -08:00
// Save the parameter so we can pass it to the client later.
// These can be server_encoding, client_encoding, server timezone, Postgres version,
// and many more interesting things we should know about the Postgres server we are talking to.
2022-02-03 17:48:37 -08:00
server_info . put_u8 ( b 'S' ) ;
server_info . put_i32 ( len ) ;
server_info . put_slice ( & param [ .. ] ) ;
2022-02-03 15:17:04 -08:00
}
2022-02-15 22:45:45 -08:00
// BackendKeyData
2022-02-03 15:17:04 -08:00
'K' = > {
2022-02-15 22:45:45 -08:00
// The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
2022-03-10 01:33:29 -08:00
// See: <https://www.postgresql.org/docs/12/protocol-message-formats.html>.
2022-02-20 22:47:08 -08:00
process_id = match stream . read_i32 ( ) . await {
2022-02-03 17:48:37 -08:00
Ok ( id ) = > id ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading process id message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 17:48:37 -08:00
} ;
secret_key = match stream . read_i32 ( ) . await {
Ok ( id ) = > id ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading secret key message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
}
2022-02-15 22:45:45 -08:00
// ReadyForQuery
2022-02-03 15:17:04 -08:00
'Z' = > {
let mut idle = vec! [ 0 u8 ; len as usize - 4 ] ;
match stream . read_exact ( & mut idle ) . await {
Ok ( _ ) = > ( ) ,
2022-11-17 09:24:39 -08:00
Err ( _ ) = > return Err ( Error ::SocketError ( format! ( " Error reading transaction status message on server startup {{ username: {:?} , database: {:?} }} " , user . username , database ) ) ) ,
2022-02-03 15:17:04 -08:00
} ;
let ( read , write ) = stream . into_split ( ) ;
2022-06-05 09:48:06 -07:00
let mut server = Server {
2022-02-15 08:18:01 -08:00
address : address . clone ( ) ,
2022-02-03 15:17:04 -08:00
read : BufReader ::new ( read ) ,
2022-11-10 02:04:31 +08:00
write ,
2022-02-03 15:17:04 -08:00
buffer : BytesMut ::with_capacity ( 8196 ) ,
2022-11-10 02:04:31 +08:00
server_info ,
process_id ,
secret_key ,
2022-02-03 16:25:05 -08:00
in_transaction : false ,
2022-02-04 08:26:50 -08:00
data_available : false ,
2022-02-03 17:06:19 -08:00
bad : false ,
2022-09-01 22:06:55 -05:00
needs_cleanup : false ,
2022-11-10 02:04:31 +08:00
client_server_map ,
2022-02-12 10:16:05 -08:00
connected_at : chrono ::offset ::Utc ::now ( ) . naive_utc ( ) ,
2022-11-10 02:04:31 +08:00
stats ,
2022-06-05 09:48:06 -07:00
application_name : String ::new ( ) ,
2022-08-11 17:42:40 -04:00
last_activity : SystemTime ::now ( ) ,
2023-03-10 06:23:51 -06:00
mirror_manager : match address . mirrors . len ( ) {
0 = > None ,
_ = > Some ( MirroringManager ::from_addresses (
user . clone ( ) ,
database . to_owned ( ) ,
address . mirrors . clone ( ) ,
) ) ,
} ,
2022-06-05 09:48:06 -07:00
} ;
server . set_name ( " pgcat " ) . await ? ;
return Ok ( server ) ;
2022-02-03 15:17:04 -08:00
}
2022-02-15 22:45:45 -08:00
// We have an unexpected message from the server during this exchange.
// Means we implemented the protocol wrong or we're not talking to a Postgres server.
2022-02-03 15:17:04 -08:00
_ = > {
2022-02-20 22:47:08 -08:00
error! ( " Unknown code: {} " , code ) ;
2022-11-17 09:24:39 -08:00
return Err ( Error ::ProtocolSyncError ( format! (
" Unknown server code: {} " ,
code
) ) ) ;
2022-02-03 15:17:04 -08:00
}
} ;
}
}
2022-02-15 22:45:45 -08:00
/// Issue a query cancellation request to the server.
2022-02-04 16:01:35 -08:00
/// Uses a separate connection that's not part of the connection pool.
pub async fn cancel (
host : & str ,
2022-08-25 06:40:56 -07:00
port : u16 ,
2022-02-04 16:01:35 -08:00
process_id : i32 ,
secret_key : i32 ,
) -> Result < ( ) , Error > {
let mut stream = match TcpStream ::connect ( & format! ( " {} : {} " , host , port ) ) . await {
Ok ( stream ) = > stream ,
Err ( err ) = > {
2022-02-20 22:47:08 -08:00
error! ( " Could not connect to server: {} " , err ) ;
2022-11-17 09:24:39 -08:00
return Err ( Error ::SocketError ( format! ( " Error reading cancel message " ) ) ) ;
2022-02-04 16:01:35 -08:00
}
} ;
2023-02-08 11:35:38 -06:00
configure_socket ( & stream ) ;
2022-02-04 16:01:35 -08:00
2022-02-22 19:26:08 -08:00
debug! ( " Sending CancelRequest " ) ;
2022-02-04 16:01:35 -08:00
let mut bytes = BytesMut ::with_capacity ( 16 ) ;
bytes . put_i32 ( 16 ) ;
2022-02-15 22:45:45 -08:00
bytes . put_i32 ( CANCEL_REQUEST_CODE ) ;
2022-02-04 16:01:35 -08:00
bytes . put_i32 ( process_id ) ;
bytes . put_i32 ( secret_key ) ;
2022-11-10 02:04:31 +08:00
write_all ( & mut stream , bytes ) . await
2022-02-04 16:01:35 -08:00
}
2022-02-15 22:45:45 -08:00
/// Send messages to the server from the client.
2023-01-17 20:39:55 -05:00
pub async fn send ( & mut self , messages : & BytesMut ) -> Result < ( ) , Error > {
2023-03-10 06:23:51 -06:00
self . mirror_send ( messages ) ;
2023-03-28 17:19:37 +02:00
self . stats ( ) . data_sent ( messages . len ( ) ) ;
2022-02-14 10:00:55 -08:00
2022-02-03 17:06:19 -08:00
match write_all_half ( & mut self . write , messages ) . await {
2022-08-11 17:42:40 -04:00
Ok ( _ ) = > {
// Successfully sent to server
self . last_activity = SystemTime ::now ( ) ;
Ok ( ( ) )
}
2022-02-03 17:06:19 -08:00
Err ( err ) = > {
2022-02-20 22:47:08 -08:00
error! ( " Terminating server because of: {:?} " , err ) ;
2022-02-03 17:06:19 -08:00
self . bad = true ;
Err ( err )
}
}
2022-02-03 15:17:04 -08:00
}
2022-02-15 22:45:45 -08:00
/// Receive data from the server in response to a client request.
2022-02-04 09:28:52 -08:00
/// This method must be called multiple times while `self.is_data_available()` is true
/// in order to receive all data the server has to offer.
2022-02-03 15:17:04 -08:00
pub async fn recv ( & mut self ) -> Result < BytesMut , Error > {
loop {
2022-02-03 17:06:19 -08:00
let mut message = match read_message ( & mut self . read ) . await {
Ok ( message ) = > message ,
Err ( err ) = > {
2022-02-20 22:47:08 -08:00
error! ( " Terminating server because of: {:?} " , err ) ;
2022-02-03 17:06:19 -08:00
self . bad = true ;
return Err ( err ) ;
}
} ;
2022-02-03 15:17:04 -08:00
2022-02-15 22:45:45 -08:00
// Buffer the message we'll forward to the client later.
2022-02-03 15:17:04 -08:00
self . buffer . put ( & message [ .. ] ) ;
let code = message . get_u8 ( ) as char ;
2022-02-03 16:25:05 -08:00
let _len = message . get_i32 ( ) ;
2022-02-03 15:33:26 -08:00
2022-02-24 08:44:41 -08:00
trace! ( " Message: {} " , code ) ;
2022-02-22 19:26:08 -08:00
2022-02-03 15:17:04 -08:00
match code {
2022-02-15 22:45:45 -08:00
// ReadyForQuery
2022-02-03 15:17:04 -08:00
'Z' = > {
2022-02-03 16:25:05 -08:00
let transaction_state = message . get_u8 ( ) as char ;
2022-02-03 17:06:19 -08:00
2022-02-03 16:25:05 -08:00
match transaction_state {
2022-02-15 22:45:45 -08:00
// In transaction.
2022-02-03 16:25:05 -08:00
'T' = > {
self . in_transaction = true ;
2022-02-03 17:06:19 -08:00
}
2022-02-03 16:25:05 -08:00
2022-02-15 22:45:45 -08:00
// Idle, transaction over.
2022-02-03 16:25:05 -08:00
'I' = > {
self . in_transaction = false ;
2022-02-03 17:06:19 -08:00
}
2022-08-11 17:42:40 -04:00
// Some error occurred, the transaction was rolled back.
2022-02-03 17:06:19 -08:00
'E' = > {
2022-02-03 18:13:36 -08:00
self . in_transaction = true ;
2022-02-03 17:06:19 -08:00
}
2022-02-03 16:25:05 -08:00
2022-02-15 22:45:45 -08:00
// Something totally unexpected, this is not a Postgres server we know.
2022-02-03 16:25:05 -08:00
_ = > {
2022-02-03 17:06:19 -08:00
self . bad = true ;
2022-11-17 09:24:39 -08:00
return Err ( Error ::ProtocolSyncError ( format! (
" Unknown transaction state: {} " ,
transaction_state
) ) ) ;
2022-02-03 17:06:19 -08:00
}
2022-02-03 16:25:05 -08:00
} ;
2022-02-15 22:45:45 -08:00
// There is no more data available from the server.
2022-02-05 14:38:41 -08:00
self . data_available = false ;
2022-02-03 15:17:04 -08:00
break ;
2022-02-03 17:06:19 -08:00
}
2022-02-03 15:17:04 -08:00
2022-09-01 22:06:55 -05:00
// CommandComplete
'C' = > {
let mut command_tag = String ::new ( ) ;
match message . reader ( ) . read_to_string ( & mut command_tag ) {
Ok ( _ ) = > {
// Non-exhaustive list of commands that are likely to change session variables/resources
// which can leak between clients. This is a best effort to block bad clients
// from poisoning a transaction-mode pool by setting inappropriate session variables
match command_tag . as_str ( ) {
2022-10-13 22:33:12 -04:00
" SET \0 " = > {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
// As a result, we will miss cases when set statements are used in transactions
// This will reduce amount of discard statements sent
if ! self . in_transaction {
debug! ( " Server connection marked for clean up " ) ;
self . needs_cleanup = true ;
}
}
" PREPARE \0 " = > {
2022-09-01 22:06:55 -05:00
debug! ( " Server connection marked for clean up " ) ;
self . needs_cleanup = true ;
}
_ = > ( ) ,
}
}
Err ( err ) = > {
warn! ( " Encountered an error while parsing CommandTag {} " , err ) ;
}
}
}
2022-02-15 22:45:45 -08:00
// DataRow
2022-02-05 14:38:41 -08:00
'D' = > {
2022-02-15 22:45:45 -08:00
// More data is available after this message, this is not the end of the reply.
2022-02-05 14:38:41 -08:00
self . data_available = true ;
2022-03-10 01:33:29 -08:00
// Don't flush yet, the more we buffer, the faster this goes...up to a limit.
2022-02-05 14:38:41 -08:00
if self . buffer . len ( ) > = 8196 {
break ;
}
}
2022-02-15 22:45:45 -08:00
// CopyInResponse: copy is starting from client to server.
2022-02-04 08:06:45 -08:00
'G' = > break ,
2022-02-15 22:45:45 -08:00
// CopyOutResponse: copy is starting from the server to the client.
2022-02-04 08:26:50 -08:00
'H' = > {
self . data_available = true ;
break ;
}
2022-12-21 09:57:53 -05:00
// CopyData
'd' = > {
// Don't flush yet, buffer until we reach limit
if self . buffer . len ( ) > = 8196 {
break ;
}
}
2022-02-04 08:26:50 -08:00
// CopyDone
2022-02-15 22:45:45 -08:00
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
'c' = > ( ) ,
2022-02-04 08:26:50 -08:00
2022-02-15 22:45:45 -08:00
// Anything else, e.g. errors, notices, etc.
// Keep buffering until ReadyForQuery shows up.
_ = > ( ) ,
2022-02-03 15:17:04 -08:00
} ;
}
let bytes = self . buffer . clone ( ) ;
2022-02-15 22:45:45 -08:00
// Keep track of how much data we got from the server for stats.
2023-03-28 17:19:37 +02:00
self . stats ( ) . data_received ( bytes . len ( ) ) ;
2022-02-15 22:45:45 -08:00
// Clear the buffer for next query.
2022-02-03 15:17:04 -08:00
self . buffer . clear ( ) ;
2022-08-11 17:42:40 -04:00
// Successfully received data from server
self . last_activity = SystemTime ::now ( ) ;
2022-02-15 22:45:45 -08:00
// Pass the data back to the client.
2022-02-03 15:17:04 -08:00
Ok ( bytes )
}
2022-02-03 16:25:05 -08:00
2022-02-04 09:28:52 -08:00
/// If the server is still inside a transaction.
/// If the client disconnects while the server is in a transaction, we will clean it up.
2022-02-03 16:25:05 -08:00
pub fn in_transaction ( & self ) -> bool {
2023-01-28 15:36:35 -08:00
debug! ( " Server in transaction: {} " , self . in_transaction ) ;
2022-02-03 16:25:05 -08:00
self . in_transaction
}
2022-02-03 17:06:19 -08:00
2022-02-04 09:28:52 -08:00
/// We don't buffer all of server responses, e.g. COPY OUT produces too much data.
/// The client is responsible to call `self.recv()` while this method returns true.
2022-02-04 08:26:50 -08:00
pub fn is_data_available ( & self ) -> bool {
self . data_available
}
2022-02-04 09:28:52 -08:00
/// Server & client are out of sync, we must discard this connection.
/// This happens with clients that misbehave.
2022-02-03 17:06:19 -08:00
pub fn is_bad ( & self ) -> bool {
self . bad
}
2022-02-04 09:28:52 -08:00
/// Get server startup information to forward it to the client.
/// Not used at the moment.
2022-02-03 17:48:37 -08:00
pub fn server_info ( & self ) -> BytesMut {
self . server_info . clone ( )
}
2022-02-04 09:28:52 -08:00
/// Indicate that this server connection cannot be re-used and must be discarded.
2022-02-03 17:06:19 -08:00
pub fn mark_bad ( & mut self ) {
2022-03-10 01:33:29 -08:00
error! ( " Server {:?} marked bad " , self . address ) ;
2022-02-03 17:06:19 -08:00
self . bad = true ;
}
2022-02-04 16:08:18 -08:00
/// Claim this server as mine for the purposes of query cancellation.
2022-02-04 16:01:35 -08:00
pub fn claim ( & mut self , process_id : i32 , secret_key : i32 ) {
2022-02-24 08:44:41 -08:00
let mut guard = self . client_server_map . lock ( ) ;
2022-02-05 10:02:13 -08:00
guard . insert (
( process_id , secret_key ) ,
(
2022-02-20 22:47:08 -08:00
self . process_id ,
2022-02-05 10:02:13 -08:00
self . secret_key ,
2022-02-15 08:18:01 -08:00
self . address . host . clone ( ) ,
2022-08-25 06:40:56 -07:00
self . address . port ,
2022-02-05 10:02:13 -08:00
) ,
) ;
2022-02-04 16:01:35 -08:00
}
2022-02-04 09:28:52 -08:00
/// Execute an arbitrary query against the server.
2022-02-15 22:45:45 -08:00
/// It will use the simple query protocol.
2022-02-04 09:28:52 -08:00
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
2022-02-03 17:32:04 -08:00
pub async fn query ( & mut self , query : & str ) -> Result < ( ) , Error > {
2022-02-24 08:44:41 -08:00
let query = simple_query ( query ) ;
2022-02-03 17:06:19 -08:00
2023-01-17 20:39:55 -05:00
self . send ( & query ) . await ? ;
2022-02-15 22:45:45 -08:00
2022-02-05 14:38:41 -08:00
loop {
let _ = self . recv ( ) . await ? ;
2022-02-15 22:45:45 -08:00
2022-02-05 14:38:41 -08:00
if ! self . data_available {
break ;
}
}
2022-02-03 17:06:19 -08:00
Ok ( ( ) )
}
2022-02-03 17:32:04 -08:00
2022-09-01 22:06:55 -05:00
/// Perform any necessary cleanup before putting the server
/// connection back in the pool
pub async fn checkin_cleanup ( & mut self ) -> Result < ( ) , Error > {
// Client disconnected with an open transaction on the server connection.
// Pgbouncer behavior is to close the server connection but that can cause
// server connection thrashing if clients repeatedly do this.
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
if self . in_transaction ( ) {
2022-09-22 13:07:02 -04:00
warn! ( " Server returned while still in transaction, rolling back transaction " ) ;
2022-09-01 22:06:55 -05:00
self . query ( " ROLLBACK " ) . await ? ;
}
2022-10-13 22:33:12 -04:00
// Client disconnected but it performed session-altering operations such as
2022-09-01 22:06:55 -05:00
// SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self . needs_cleanup {
2022-09-22 13:07:02 -04:00
warn! ( " Server returned with session state altered, discarding state " ) ;
2022-09-01 22:06:55 -05:00
self . query ( " DISCARD ALL " ) . await ? ;
self . needs_cleanup = false ;
}
2022-11-10 02:04:31 +08:00
Ok ( ( ) )
2022-09-01 22:06:55 -05:00
}
2022-02-04 09:28:52 -08:00
/// A shorthand for `SET application_name = $1`.
2022-02-03 17:32:04 -08:00
pub async fn set_name ( & mut self , name : & str ) -> Result < ( ) , Error > {
2022-06-05 09:48:06 -07:00
if self . application_name ! = name {
self . application_name = name . to_string ( ) ;
2022-09-01 22:06:55 -05:00
// We don't want `SET application_name` to mark the server connection
// as needing cleanup
let needs_cleanup_before = self . needs_cleanup ;
let result = Ok ( self
2022-06-05 09:48:06 -07:00
. query ( & format! ( " SET application_name = ' {} ' " , name ) )
2022-09-01 22:06:55 -05:00
. await ? ) ;
self . needs_cleanup = needs_cleanup_before ;
2022-11-10 02:04:31 +08:00
result
2022-06-05 09:48:06 -07:00
} else {
Ok ( ( ) )
}
2022-02-03 17:32:04 -08:00
}
2022-02-05 13:15:53 -08:00
2023-03-28 17:19:37 +02:00
/// get Server stats
pub fn stats ( & self ) -> Arc < ServerStats > {
self . stats . clone ( )
}
2022-02-15 22:45:45 -08:00
/// Get the servers address.
#[ allow(dead_code) ]
2022-02-05 13:15:53 -08:00
pub fn address ( & self ) -> Address {
2022-02-15 08:18:01 -08:00
self . address . clone ( )
2022-02-05 13:15:53 -08:00
}
2022-02-20 22:47:08 -08:00
2022-08-11 17:42:40 -04:00
// Get server's latest response timestamp
pub fn last_activity ( & self ) -> SystemTime {
self . last_activity
}
2022-09-07 22:37:17 -05:00
// Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty ( & mut self ) {
self . needs_cleanup = true ;
}
2023-03-10 06:23:51 -06:00
pub fn mirror_send ( & mut self , bytes : & BytesMut ) {
match self . mirror_manager . as_mut ( ) {
Some ( manager ) = > manager . send ( bytes ) ,
None = > ( ) ,
}
}
pub fn mirror_disconnect ( & mut self ) {
match self . mirror_manager . as_mut ( ) {
Some ( manager ) = > manager . disconnect ( ) ,
None = > ( ) ,
}
}
2022-02-03 15:17:04 -08:00
}
2022-02-12 10:16:05 -08:00
impl Drop for Server {
2022-02-15 22:45:45 -08:00
/// Try to do a clean shut down. Best effort because
/// the socket is in non-blocking mode, so it may not be ready
/// for a write.
2022-02-12 10:16:05 -08:00
fn drop ( & mut self ) {
2023-03-10 06:23:51 -06:00
self . mirror_disconnect ( ) ;
2023-03-28 17:19:37 +02:00
// Update statistics
self . stats . disconnect ( ) ;
2022-02-20 22:47:08 -08:00
2022-02-12 10:16:05 -08:00
let mut bytes = BytesMut ::with_capacity ( 4 ) ;
bytes . put_u8 ( b 'X' ) ;
bytes . put_i32 ( 4 ) ;
match self . write . try_write ( & bytes ) {
2022-02-15 22:45:45 -08:00
Ok ( _ ) = > ( ) ,
2022-03-10 01:33:29 -08:00
Err ( _ ) = > debug! ( " Dirty shutdown " ) ,
2022-02-12 10:16:05 -08:00
} ;
2022-03-10 01:33:29 -08:00
// Should not matter.
2022-02-12 10:16:05 -08:00
self . bad = true ;
let now = chrono ::offset ::Utc ::now ( ) . naive_utc ( ) ;
let duration = now - self . connected_at ;
2022-02-20 22:47:08 -08:00
info! (
2022-09-01 13:16:22 -05:00
" Server connection closed {:?}, session duration: {} " ,
self . address ,
2022-02-12 10:16:05 -08:00
crate ::format_duration ( & duration )
) ;
}
}