2022-02-15 22:45:45 -08:00
|
|
|
/// Helper functions to send one-off protocol messages
|
|
|
|
|
/// and handle TcpStream (TCP socket).
|
2022-02-14 05:11:53 -08:00
|
|
|
use bytes::{Buf, BufMut, BytesMut};
|
2023-02-08 11:35:38 -06:00
|
|
|
use log::error;
|
2022-02-03 15:17:04 -08:00
|
|
|
use md5::{Digest, Md5};
|
2023-02-08 11:35:38 -06:00
|
|
|
use socket2::{SockRef, TcpKeepalive};
|
2022-06-27 16:45:41 -07:00
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
|
use tokio::net::TcpStream;
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2023-02-08 11:35:38 -06:00
|
|
|
use crate::config::get_config;
|
2022-03-01 08:47:19 -08:00
|
|
|
use crate::errors::Error;
|
2022-02-14 05:11:53 -08:00
|
|
|
use std::collections::HashMap;
|
2023-01-19 10:19:49 -05:00
|
|
|
use std::io::{BufRead, Cursor};
|
2022-07-31 21:52:23 -05:00
|
|
|
use std::mem;
|
2023-02-08 11:35:38 -06:00
|
|
|
use std::time::Duration;
|
2022-02-14 05:11:53 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
/// Postgres data type mappings
|
|
|
|
|
/// used in RowDescription ('T') message.
|
|
|
|
|
pub enum DataType {
|
|
|
|
|
Text,
|
|
|
|
|
Int4,
|
|
|
|
|
Numeric,
|
2023-05-03 09:13:05 -07:00
|
|
|
Bool,
|
|
|
|
|
Oid,
|
|
|
|
|
AnyArray,
|
|
|
|
|
Any,
|
2022-03-01 08:47:19 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl From<&DataType> for i32 {
|
|
|
|
|
fn from(data_type: &DataType) -> i32 {
|
|
|
|
|
match data_type {
|
|
|
|
|
DataType::Text => 25,
|
|
|
|
|
DataType::Int4 => 23,
|
|
|
|
|
DataType::Numeric => 1700,
|
2023-05-03 09:13:05 -07:00
|
|
|
DataType::Bool => 16,
|
|
|
|
|
DataType::Oid => 26,
|
|
|
|
|
DataType::AnyArray => 2277,
|
|
|
|
|
DataType::Any => 2276,
|
2022-03-01 08:47:19 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Tell the client that authentication handshake completed successfully.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn auth_ok<S>(stream: &mut S) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-03 13:35:40 -08:00
|
|
|
let mut auth_ok = BytesMut::with_capacity(9);
|
|
|
|
|
|
|
|
|
|
auth_ok.put_u8(b'R');
|
|
|
|
|
auth_ok.put_i32(8);
|
|
|
|
|
auth_ok.put_i32(0);
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
write_all(stream, auth_ok).await
|
2022-02-03 13:35:40 -08:00
|
|
|
}
|
|
|
|
|
|
2022-06-20 06:15:54 -07:00
|
|
|
/// Generate md5 password challenge.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn md5_challenge<S>(stream: &mut S) -> Result<[u8; 4], Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-06-20 06:15:54 -07:00
|
|
|
// let mut rng = rand::thread_rng();
|
|
|
|
|
let salt: [u8; 4] = [
|
|
|
|
|
rand::random(),
|
|
|
|
|
rand::random(),
|
|
|
|
|
rand::random(),
|
|
|
|
|
rand::random(),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
res.put_u8(b'R');
|
|
|
|
|
res.put_i32(12);
|
|
|
|
|
res.put_i32(5); // MD5
|
|
|
|
|
res.put_slice(&salt[..]);
|
|
|
|
|
|
|
|
|
|
write_all(stream, res).await?;
|
|
|
|
|
Ok(salt)
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Give the client the process_id and secret we generated
|
|
|
|
|
/// used in query cancellation.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn backend_key_data<S>(
|
|
|
|
|
stream: &mut S,
|
2022-02-04 09:28:52 -08:00
|
|
|
backend_id: i32,
|
|
|
|
|
secret_key: i32,
|
2022-06-27 09:46:33 -07:00
|
|
|
) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-04 09:28:52 -08:00
|
|
|
let mut key_data = BytesMut::from(&b"K"[..]);
|
|
|
|
|
key_data.put_i32(12);
|
|
|
|
|
key_data.put_i32(backend_id);
|
|
|
|
|
key_data.put_i32(secret_key);
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
write_all(stream, key_data).await
|
2022-02-04 09:28:52 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-24 08:44:41 -08:00
|
|
|
/// Construct a `Q`: Query message.
|
2022-02-19 08:57:24 -08:00
|
|
|
pub fn simple_query(query: &str) -> BytesMut {
|
|
|
|
|
let mut res = BytesMut::from(&b"Q"[..]);
|
|
|
|
|
let query = format!("{}\0", query);
|
|
|
|
|
|
|
|
|
|
res.put_i32(query.len() as i32 + 4);
|
2022-11-10 02:04:31 +08:00
|
|
|
res.put_slice(query.as_bytes());
|
2022-02-19 08:57:24 -08:00
|
|
|
|
|
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Tell the client we're ready for another query.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn ready_for_query<S>(stream: &mut S) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-08-08 19:01:24 -04:00
|
|
|
let mut bytes = BytesMut::with_capacity(
|
|
|
|
|
mem::size_of::<u8>() + mem::size_of::<i32>() + mem::size_of::<u8>(),
|
|
|
|
|
);
|
2022-02-03 13:35:40 -08:00
|
|
|
|
|
|
|
|
bytes.put_u8(b'Z');
|
|
|
|
|
bytes.put_i32(5);
|
|
|
|
|
bytes.put_u8(b'I'); // Idle
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
write_all(stream, bytes).await
|
2022-02-03 13:35:40 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Send the startup packet the server. We're pretending we're a Pg client.
|
|
|
|
|
/// This tells the server which user we are and what database we want.
|
2023-04-30 09:41:46 -07:00
|
|
|
pub async fn startup<S>(stream: &mut S, user: &str, database: &str) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-03 15:17:04 -08:00
|
|
|
let mut bytes = BytesMut::with_capacity(25);
|
|
|
|
|
|
|
|
|
|
bytes.put_i32(196608); // Protocol number
|
|
|
|
|
|
|
|
|
|
// User
|
|
|
|
|
bytes.put(&b"user\0"[..]);
|
2022-11-10 02:04:31 +08:00
|
|
|
bytes.put_slice(user.as_bytes());
|
2022-02-03 15:17:04 -08:00
|
|
|
bytes.put_u8(0);
|
|
|
|
|
|
|
|
|
|
// Database
|
|
|
|
|
bytes.put(&b"database\0"[..]);
|
2022-11-10 02:04:31 +08:00
|
|
|
bytes.put_slice(database.as_bytes());
|
2022-02-03 15:17:04 -08:00
|
|
|
bytes.put_u8(0);
|
|
|
|
|
bytes.put_u8(0); // Null terminator
|
|
|
|
|
|
|
|
|
|
let len = bytes.len() as i32 + 4i32;
|
|
|
|
|
|
|
|
|
|
let mut startup = BytesMut::with_capacity(len as usize);
|
|
|
|
|
|
|
|
|
|
startup.put_i32(len);
|
|
|
|
|
startup.put(bytes);
|
|
|
|
|
|
|
|
|
|
match stream.write_all(&startup).await {
|
|
|
|
|
Ok(_) => Ok(()),
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
2022-11-17 09:24:39 -08:00
|
|
|
return Err(Error::SocketError(format!(
|
2023-01-19 05:18:08 -06:00
|
|
|
"Error writing startup to server socket - Error: {:?}",
|
|
|
|
|
err
|
2022-11-17 09:24:39 -08:00
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-04-30 09:41:46 -07:00
|
|
|
pub async fn ssl_request(stream: &mut TcpStream) -> Result<(), Error> {
|
|
|
|
|
let mut bytes = BytesMut::with_capacity(12);
|
|
|
|
|
|
|
|
|
|
bytes.put_i32(8);
|
|
|
|
|
bytes.put_i32(80877103);
|
|
|
|
|
|
|
|
|
|
match stream.write_all(&bytes).await {
|
|
|
|
|
Ok(_) => Ok(()),
|
|
|
|
|
Err(err) => Err(Error::SocketError(format!(
|
|
|
|
|
"Error writing SSLRequest to server socket - Error: {:?}",
|
|
|
|
|
err
|
|
|
|
|
))),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-26 11:01:52 -08:00
|
|
|
/// Parse the params the server sends as a key/value format.
|
|
|
|
|
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
2022-02-14 05:11:53 -08:00
|
|
|
let mut result = HashMap::new();
|
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
let mut tmp = String::new();
|
|
|
|
|
|
|
|
|
|
while bytes.has_remaining() {
|
|
|
|
|
let mut c = bytes.get_u8();
|
|
|
|
|
|
|
|
|
|
// Null-terminated C-strings.
|
|
|
|
|
while c != 0 {
|
|
|
|
|
tmp.push(c as char);
|
|
|
|
|
c = bytes.get_u8();
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
if !tmp.is_empty() {
|
2022-02-14 05:11:53 -08:00
|
|
|
buf.push(tmp.clone());
|
|
|
|
|
tmp.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Expect pairs of name and value
|
|
|
|
|
// and at least one pair to be present.
|
2022-02-26 11:01:52 -08:00
|
|
|
if buf.len() % 2 != 0 || buf.len() < 2 {
|
2022-02-14 05:11:53 -08:00
|
|
|
return Err(Error::ClientBadStartup);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut i = 0;
|
|
|
|
|
while i < buf.len() {
|
|
|
|
|
let name = buf[i].clone();
|
|
|
|
|
let value = buf[i + 1].clone();
|
|
|
|
|
let _ = result.insert(name, value);
|
|
|
|
|
i += 2;
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-26 11:01:52 -08:00
|
|
|
Ok(result)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Parse StartupMessage parameters.
|
|
|
|
|
/// e.g. user, database, application_name, etc.
|
|
|
|
|
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
|
|
|
|
|
let result = parse_params(bytes)?;
|
|
|
|
|
|
2022-02-14 05:11:53 -08:00
|
|
|
// Minimum required parameters
|
|
|
|
|
// I want to have the user at the very minimum, according to the protocol spec.
|
|
|
|
|
if !result.contains_key("user") {
|
|
|
|
|
return Err(Error::ClientBadStartup);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(result)
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-20 06:15:54 -07:00
|
|
|
/// Create md5 password hash given a salt.
|
|
|
|
|
pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
|
2022-02-03 15:17:04 -08:00
|
|
|
let mut md5 = Md5::new();
|
|
|
|
|
|
|
|
|
|
// First pass
|
|
|
|
|
md5.update(&password.as_bytes());
|
|
|
|
|
md5.update(&user.as_bytes());
|
|
|
|
|
|
|
|
|
|
let output = md5.finalize_reset();
|
|
|
|
|
|
|
|
|
|
// Second pass
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
md5_hash_second_pass(&(format!("{:x}", output)), salt)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn md5_hash_second_pass(hash: &str, salt: &[u8]) -> Vec<u8> {
|
|
|
|
|
let mut md5 = Md5::new();
|
|
|
|
|
// Second pass
|
|
|
|
|
md5.update(hash);
|
2022-02-03 15:17:04 -08:00
|
|
|
md5.update(salt);
|
|
|
|
|
|
|
|
|
|
let mut password = format!("md5{:x}", md5.finalize())
|
|
|
|
|
.chars()
|
|
|
|
|
.map(|x| x as u8)
|
|
|
|
|
.collect::<Vec<u8>>();
|
|
|
|
|
password.push(0);
|
|
|
|
|
|
2022-06-20 06:15:54 -07:00
|
|
|
password
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send password challenge response to the server.
|
|
|
|
|
/// This is the MD5 challenge.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn md5_password<S>(
|
|
|
|
|
stream: &mut S,
|
2022-06-20 06:15:54 -07:00
|
|
|
user: &str,
|
|
|
|
|
password: &str,
|
|
|
|
|
salt: &[u8],
|
2022-06-27 09:46:33 -07:00
|
|
|
) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-06-20 06:15:54 -07:00
|
|
|
let password = md5_hash_password(user, password, salt);
|
|
|
|
|
|
2022-02-03 15:17:04 -08:00
|
|
|
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
|
Auth passthrough (auth_query) (#266)
* Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band'
queries to servers that don't interfere with pools at all.
In order to reuse startup code for making these simple queries,
we need to set the stats (`Reporter`) optional, so using these
simple queries wont interfere with stats.
* Add auth passthough (auth_query)
Adds a feature that allows setting auth passthrough for md5 auth.
It adds 3 new (general and pool) config parameters:
- `auth_query`: An string containing a query that will be executed on boot
to obtain the hash of a given user. This query have to use a placeholder `$1`,
so pgcat can replace it with the user its trying to fetch the hash from.
- `auth_query_user`: The user to use for connecting to the server and executing the
auth_query.
- `auth_query_password`: The password to use for connecting to the server and executing the
auth_query.
The configuration can be done either on the general config (so pools share them) or in a per-pool basis.
The behavior is, at boot time, when validating server connections, a hash is fetched per server
and stored in the pool. When new server connections are created, and no cleartext password is specified,
the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries
it.
When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether
we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no
hash (we could not obtain one when validating the connection), a new fetch is tried.
Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure
we refetch the hash and retry auth (so password changes can be done).
The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be
obtained during connection validation, or the password has change, we can still connect later.
* Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
|
|
|
|
|
|
|
|
message.put_u8(b'p');
|
|
|
|
|
message.put_i32(password.len() as i32 + 4);
|
|
|
|
|
message.put_slice(&password[..]);
|
|
|
|
|
|
|
|
|
|
write_all(stream, message).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn md5_password_with_hash<S>(stream: &mut S, hash: &str, salt: &[u8]) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
|
|
|
|
let password = md5_hash_second_pass(hash, salt);
|
|
|
|
|
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
|
2022-02-15 22:45:45 -08:00
|
|
|
|
2022-02-03 15:17:04 -08:00
|
|
|
message.put_u8(b'p');
|
|
|
|
|
message.put_i32(password.len() as i32 + 4);
|
|
|
|
|
message.put_slice(&password[..]);
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
write_all(stream, message).await
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-09 20:02:20 -08:00
|
|
|
/// Implements a response to our custom `SET SHARDING KEY`
|
|
|
|
|
/// and `SET SERVER ROLE` commands.
|
2022-02-09 06:51:31 -08:00
|
|
|
/// This tells the client we're ready for the next query.
|
2022-06-27 15:52:01 -07:00
|
|
|
pub async fn custom_protocol_response_ok<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-08 13:11:50 -08:00
|
|
|
let mut res = BytesMut::with_capacity(25);
|
|
|
|
|
|
2022-02-09 20:02:20 -08:00
|
|
|
let set_complete = BytesMut::from(&format!("{}\0", message)[..]);
|
2022-02-08 13:11:50 -08:00
|
|
|
let len = (set_complete.len() + 4) as i32;
|
|
|
|
|
|
2022-02-09 06:51:31 -08:00
|
|
|
// CommandComplete
|
2022-02-08 13:11:50 -08:00
|
|
|
res.put_u8(b'C');
|
|
|
|
|
res.put_i32(len);
|
|
|
|
|
res.put_slice(&set_complete[..]);
|
|
|
|
|
|
2023-01-17 20:39:55 -05:00
|
|
|
write_all_half(stream, &res).await?;
|
2022-08-08 19:01:24 -04:00
|
|
|
ready_for_query(stream).await
|
2022-02-08 13:11:50 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-16 22:52:11 -08:00
|
|
|
/// Send a custom error message to the client.
|
|
|
|
|
/// Tell the client we are ready for the next query and no rollback is necessary.
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn error_response<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
2022-08-08 19:01:24 -04:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
|
|
|
|
error_response_terminal(stream, message).await?;
|
|
|
|
|
ready_for_query(stream).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send a custom error message to the client.
|
|
|
|
|
/// Tell the client we are ready for the next query and no rollback is necessary.
|
|
|
|
|
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
|
|
|
|
|
pub async fn error_response_terminal<S>(stream: &mut S, message: &str) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-16 22:52:11 -08:00
|
|
|
let mut error = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
// Error level
|
|
|
|
|
error.put_u8(b'S');
|
|
|
|
|
error.put_slice(&b"FATAL\0"[..]);
|
|
|
|
|
|
|
|
|
|
// Error level (non-translatable)
|
|
|
|
|
error.put_u8(b'V');
|
|
|
|
|
error.put_slice(&b"FATAL\0"[..]);
|
|
|
|
|
|
|
|
|
|
// Error code: not sure how much this matters.
|
|
|
|
|
error.put_u8(b'C');
|
|
|
|
|
error.put_slice(&b"58000\0"[..]); // system_error, see Appendix A.
|
|
|
|
|
|
|
|
|
|
// The short error message.
|
|
|
|
|
error.put_u8(b'M');
|
2022-11-10 02:04:31 +08:00
|
|
|
error.put_slice(format!("{}\0", message).as_bytes());
|
2022-02-16 22:52:11 -08:00
|
|
|
|
|
|
|
|
// No more fields follow.
|
|
|
|
|
error.put_u8(0);
|
|
|
|
|
|
|
|
|
|
// Compose the two message reply.
|
2022-08-08 19:01:24 -04:00
|
|
|
let mut res = BytesMut::with_capacity(error.len() + 5);
|
2022-02-16 22:52:11 -08:00
|
|
|
|
|
|
|
|
res.put_u8(b'E');
|
|
|
|
|
res.put_i32(error.len() as i32 + 4);
|
|
|
|
|
res.put(error);
|
|
|
|
|
|
2023-01-17 20:39:55 -05:00
|
|
|
write_all_half(stream, &res).await
|
2022-02-16 22:52:11 -08:00
|
|
|
}
|
|
|
|
|
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn wrong_password<S>(stream: &mut S, user: &str) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-06-20 06:15:54 -07:00
|
|
|
let mut error = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
// Error level
|
|
|
|
|
error.put_u8(b'S');
|
|
|
|
|
error.put_slice(&b"FATAL\0"[..]);
|
|
|
|
|
|
|
|
|
|
// Error level (non-translatable)
|
|
|
|
|
error.put_u8(b'V');
|
|
|
|
|
error.put_slice(&b"FATAL\0"[..]);
|
|
|
|
|
|
|
|
|
|
// Error code: not sure how much this matters.
|
|
|
|
|
error.put_u8(b'C');
|
|
|
|
|
error.put_slice(&b"28P01\0"[..]); // system_error, see Appendix A.
|
|
|
|
|
|
|
|
|
|
// The short error message.
|
|
|
|
|
error.put_u8(b'M');
|
2022-11-10 02:04:31 +08:00
|
|
|
error.put_slice(format!("password authentication failed for user \"{}\"\0", user).as_bytes());
|
2022-06-20 06:15:54 -07:00
|
|
|
|
|
|
|
|
// No more fields follow.
|
|
|
|
|
error.put_u8(0);
|
|
|
|
|
|
|
|
|
|
// Compose the two message reply.
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
res.put_u8(b'E');
|
|
|
|
|
res.put_i32(error.len() as i32 + 4);
|
|
|
|
|
|
|
|
|
|
res.put(error);
|
|
|
|
|
|
|
|
|
|
write_all(stream, res).await
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-19 08:57:24 -08:00
|
|
|
/// Respond to a SHOW SHARD command.
|
2022-06-27 15:52:01 -07:00
|
|
|
pub async fn show_response<S>(stream: &mut S, name: &str, value: &str) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-19 08:57:24 -08:00
|
|
|
// A SELECT response consists of:
|
|
|
|
|
// 1. RowDescription
|
|
|
|
|
// 2. One or more DataRow
|
|
|
|
|
// 3. CommandComplete
|
|
|
|
|
// 4. ReadyForQuery
|
|
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// The final messages sent to the client
|
|
|
|
|
let mut res = BytesMut::new();
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// RowDescription
|
|
|
|
|
res.put(row_description(&vec![(name, DataType::Text)]));
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// DataRow
|
|
|
|
|
res.put(data_row(&vec![value.to_string()]));
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// CommandComplete
|
|
|
|
|
res.put(command_complete("SELECT 1"));
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2023-01-17 20:39:55 -05:00
|
|
|
write_all_half(stream, &res).await?;
|
2022-08-08 19:01:24 -04:00
|
|
|
ready_for_query(stream).await
|
2022-03-01 08:47:19 -08:00
|
|
|
}
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
let mut row_desc = BytesMut::new();
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2023-04-11 09:37:16 +08:00
|
|
|
// how many columns we are storing
|
2022-03-01 08:47:19 -08:00
|
|
|
row_desc.put_i16(columns.len() as i16);
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
for (name, data_type) in columns {
|
|
|
|
|
// Column name
|
2022-11-10 02:04:31 +08:00
|
|
|
row_desc.put_slice(format!("{}\0", name).as_bytes());
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Doesn't belong to any table
|
|
|
|
|
row_desc.put_i32(0);
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Doesn't belong to any table
|
|
|
|
|
row_desc.put_i16(0);
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Text
|
|
|
|
|
row_desc.put_i32(data_type.into());
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Text size = variable (-1)
|
|
|
|
|
let type_size = match data_type {
|
|
|
|
|
DataType::Text => -1,
|
|
|
|
|
DataType::Int4 => 4,
|
|
|
|
|
DataType::Numeric => -1,
|
2023-05-03 09:13:05 -07:00
|
|
|
DataType::Bool => 1,
|
|
|
|
|
DataType::Oid => 4,
|
|
|
|
|
DataType::AnyArray => -1,
|
|
|
|
|
DataType::Any => -1,
|
2022-03-01 08:47:19 -08:00
|
|
|
};
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
row_desc.put_i16(type_size);
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Type modifier: none that I know
|
|
|
|
|
row_desc.put_i32(-1);
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
// Format being used: text (0), binary (1)
|
|
|
|
|
row_desc.put_i16(0);
|
|
|
|
|
}
|
2022-02-19 08:57:24 -08:00
|
|
|
|
|
|
|
|
res.put_u8(b'T');
|
|
|
|
|
res.put_i32(row_desc.len() as i32 + 4);
|
|
|
|
|
res.put(row_desc);
|
|
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Create a DataRow message.
|
2022-03-01 08:47:19 -08:00
|
|
|
pub fn data_row(row: &Vec<String>) -> BytesMut {
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
let mut data_row = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
data_row.put_i16(row.len() as i16);
|
|
|
|
|
|
|
|
|
|
for column in row {
|
|
|
|
|
let column = column.as_bytes();
|
|
|
|
|
data_row.put_i32(column.len() as i32);
|
2022-11-10 02:04:31 +08:00
|
|
|
data_row.put_slice(column);
|
2022-03-01 08:47:19 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-19 08:57:24 -08:00
|
|
|
res.put_u8(b'D');
|
|
|
|
|
res.put_i32(data_row.len() as i32 + 4);
|
|
|
|
|
res.put(data_row);
|
|
|
|
|
|
2022-03-01 08:47:19 -08:00
|
|
|
res
|
|
|
|
|
}
|
2022-02-19 08:57:24 -08:00
|
|
|
|
2023-05-03 09:13:05 -07:00
|
|
|
pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
let mut data_row = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
data_row.put_i16(row.len() as i16);
|
|
|
|
|
|
|
|
|
|
for column in row {
|
|
|
|
|
if let Some(column) = column {
|
|
|
|
|
let column = column.as_bytes();
|
|
|
|
|
data_row.put_i32(column.len() as i32);
|
|
|
|
|
data_row.put_slice(column);
|
|
|
|
|
} else {
|
|
|
|
|
data_row.put_i32(-1 as i32);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
res.put_u8(b'D');
|
|
|
|
|
res.put_i32(data_row.len() as i32 + 4);
|
|
|
|
|
res.put(data_row);
|
|
|
|
|
|
|
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Create a CommandComplete message.
|
2022-03-01 08:47:19 -08:00
|
|
|
pub fn command_complete(command: &str) -> BytesMut {
|
|
|
|
|
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
|
|
|
|
|
let mut res = BytesMut::new();
|
|
|
|
|
res.put_u8(b'C');
|
|
|
|
|
res.put_i32(cmd.len() as i32 + 4);
|
|
|
|
|
res.put(cmd);
|
|
|
|
|
res
|
2022-02-19 08:57:24 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Write all data in the buffer to the TcpStream.
|
2022-06-27 09:46:33 -07:00
|
|
|
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-03 13:35:40 -08:00
|
|
|
match stream.write_all(&buf).await {
|
|
|
|
|
Ok(_) => Ok(()),
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Error writing to socket - Error: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 13:35:40 -08:00
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:33:20 -08:00
|
|
|
/// Write all the data in the buffer to the TcpStream, write owned half (see mpsc).
|
2023-01-17 20:39:55 -05:00
|
|
|
pub async fn write_all_half<S>(stream: &mut S, buf: &BytesMut) -> Result<(), Error>
|
2022-06-27 15:52:01 -07:00
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
2023-01-17 20:39:55 -05:00
|
|
|
match stream.write_all(buf).await {
|
2022-02-03 15:17:04 -08:00
|
|
|
Ok(_) => Ok(()),
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Error writing to socket - Error: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-04-30 09:41:46 -07:00
|
|
|
pub async fn write_all_flush<S>(stream: &mut S, buf: &[u8]) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncWrite + std::marker::Unpin,
|
|
|
|
|
{
|
|
|
|
|
match stream.write_all(buf).await {
|
|
|
|
|
Ok(_) => match stream.flush().await {
|
|
|
|
|
Ok(_) => Ok(()),
|
|
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Error flushing socket - Error: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Err(err) => {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Error writing to socket - Error: {:?}",
|
|
|
|
|
err
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-03 13:54:07 -08:00
|
|
|
/// Read a complete message from the socket.
|
2022-06-27 15:52:01 -07:00
|
|
|
pub async fn read_message<S>(stream: &mut S) -> Result<BytesMut, Error>
|
|
|
|
|
where
|
|
|
|
|
S: tokio::io::AsyncRead + std::marker::Unpin,
|
|
|
|
|
{
|
2022-02-03 13:54:07 -08:00
|
|
|
let code = match stream.read_u8().await {
|
|
|
|
|
Ok(code) => code,
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
2022-11-17 09:24:39 -08:00
|
|
|
return Err(Error::SocketError(format!(
|
2023-01-19 05:18:08 -06:00
|
|
|
"Error reading message code from socket - Error {:?}",
|
|
|
|
|
err
|
2022-11-17 09:24:39 -08:00
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let len = match stream.read_i32().await {
|
|
|
|
|
Ok(len) => len,
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
2022-11-17 09:24:39 -08:00
|
|
|
return Err(Error::SocketError(format!(
|
2023-01-19 05:18:08 -06:00
|
|
|
"Error reading message len from socket - Code: {:?}, Error: {:?}",
|
|
|
|
|
code, err
|
2022-11-17 09:24:39 -08:00
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
};
|
|
|
|
|
|
2023-01-16 23:22:06 -05:00
|
|
|
let mut bytes = BytesMut::with_capacity(len as usize + 1);
|
|
|
|
|
|
|
|
|
|
bytes.put_u8(code);
|
|
|
|
|
bytes.put_i32(len);
|
|
|
|
|
|
|
|
|
|
bytes.resize(bytes.len() + len as usize - mem::size_of::<i32>(), b'0');
|
2022-02-03 13:54:07 -08:00
|
|
|
|
2023-03-17 12:31:43 -05:00
|
|
|
let slice_start = mem::size_of::<u8>() + mem::size_of::<i32>();
|
|
|
|
|
let slice_end = slice_start + len as usize - mem::size_of::<i32>();
|
|
|
|
|
|
|
|
|
|
// Avoids a panic
|
|
|
|
|
if slice_end < slice_start {
|
|
|
|
|
return Err(Error::SocketError(format!(
|
|
|
|
|
"Error reading message from socket - Code: {:?} - Length {:?}, Error: {:?}",
|
|
|
|
|
code, len, "Unexpected length value for message"
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match stream.read_exact(&mut bytes[slice_start..slice_end]).await {
|
2022-02-03 13:54:07 -08:00
|
|
|
Ok(_) => (),
|
2023-01-19 05:18:08 -06:00
|
|
|
Err(err) => {
|
2022-11-17 09:24:39 -08:00
|
|
|
return Err(Error::SocketError(format!(
|
2023-01-19 05:18:08 -06:00
|
|
|
"Error reading message from socket - Code: {:?}, Error: {:?}",
|
|
|
|
|
code, err
|
2022-11-17 09:24:39 -08:00
|
|
|
)))
|
|
|
|
|
}
|
2022-02-03 13:54:07 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(bytes)
|
2022-02-03 15:17:04 -08:00
|
|
|
}
|
2022-07-31 21:52:23 -05:00
|
|
|
|
2022-09-20 21:47:32 -04:00
|
|
|
pub fn server_parameter_message(key: &str, value: &str) -> BytesMut {
|
2022-07-31 21:52:23 -05:00
|
|
|
let mut server_info = BytesMut::new();
|
|
|
|
|
|
|
|
|
|
let null_byte_size = 1;
|
|
|
|
|
let len: usize =
|
|
|
|
|
mem::size_of::<i32>() + key.len() + null_byte_size + value.len() + null_byte_size;
|
|
|
|
|
|
|
|
|
|
server_info.put_slice("S".as_bytes());
|
|
|
|
|
server_info.put_i32(len.try_into().unwrap());
|
|
|
|
|
server_info.put_slice(key.as_bytes());
|
|
|
|
|
server_info.put_bytes(0, 1);
|
|
|
|
|
server_info.put_slice(value.as_bytes());
|
|
|
|
|
server_info.put_bytes(0, 1);
|
|
|
|
|
|
2022-11-10 02:04:31 +08:00
|
|
|
server_info
|
2022-07-31 21:52:23 -05:00
|
|
|
}
|
2023-01-19 10:19:49 -05:00
|
|
|
|
2023-02-08 11:35:38 -06:00
|
|
|
pub fn configure_socket(stream: &TcpStream) {
|
|
|
|
|
let sock_ref = SockRef::from(stream);
|
|
|
|
|
let conf = get_config();
|
|
|
|
|
|
|
|
|
|
match sock_ref.set_keepalive(true) {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
match sock_ref.set_tcp_keepalive(
|
|
|
|
|
&TcpKeepalive::new()
|
|
|
|
|
.with_interval(Duration::from_secs(conf.general.tcp_keepalives_interval))
|
|
|
|
|
.with_retries(conf.general.tcp_keepalives_count)
|
|
|
|
|
.with_time(Duration::from_secs(conf.general.tcp_keepalives_idle)),
|
|
|
|
|
) {
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => error!("Could not configure socket: {}", err),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(err) => error!("Could not configure socket: {}", err),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-01-19 10:19:49 -05:00
|
|
|
pub trait BytesMutReader {
|
|
|
|
|
fn read_string(&mut self) -> Result<String, Error>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl BytesMutReader for Cursor<&BytesMut> {
|
|
|
|
|
/// Should only be used when reading strings from the message protocol.
|
|
|
|
|
/// Can be used to read multiple strings from the same message which are separated by the null byte
|
|
|
|
|
fn read_string(&mut self) -> Result<String, Error> {
|
|
|
|
|
let mut buf = vec![];
|
|
|
|
|
match self.read_until(b'\0', &mut buf) {
|
|
|
|
|
Ok(_) => Ok(String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string()),
|
|
|
|
|
Err(err) => return Err(Error::ParseBytesError(err.to_string())),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|