Files
pgcat/src/messages.rs

692 lines
18 KiB
Rust
Raw Normal View History

/// Helper functions to send one-off protocol messages
/// and handle TcpStream (TCP socket).
2022-02-14 05:11:53 -08:00
use bytes::{Buf, BufMut, BytesMut};
use log::error;
2022-02-03 15:17:04 -08:00
use md5::{Digest, Md5};
use socket2::{SockRef, TcpKeepalive};
2022-06-27 16:45:41 -07:00
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
2022-02-03 13:35:40 -08:00
use crate::config::get_config;
2022-03-01 08:47:19 -08:00
use crate::errors::Error;
2022-02-14 05:11:53 -08:00
use std::collections::HashMap;
use std::io::{BufRead, Cursor};
use std::mem;
use std::time::Duration;
2022-02-14 05:11:53 -08:00
2022-03-01 08:47:19 -08:00
/// Postgres data type mappings
/// used in RowDescription ('T') message.
pub enum DataType {
Text,
Int4,
Numeric,
Bool,
Oid,
AnyArray,
Any,
2022-03-01 08:47:19 -08:00
}
impl From<&DataType> for i32 {
fn from(data_type: &DataType) -> i32 {
match data_type {
DataType::Text => 25,
DataType::Int4 => 23,
DataType::Numeric => 1700,
DataType::Bool => 16,
DataType::Oid => 26,
DataType::AnyArray => 2277,
DataType::Any => 2276,
2022-03-01 08:47:19 -08:00
}
}
}
2022-02-03 13:35:40 -08:00
2022-02-08 09:33:20 -08:00
/// Tell the client that authentication handshake completed successfully.
2022-06-27 09:46:33 -07:00
pub async fn auth_ok<S>(stream: &mut S) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
2022-02-03 13:35:40 -08:00
let mut auth_ok = BytesMut::with_capacity(9);
auth_ok.put_u8(b'R');
auth_ok.put_i32(8);
auth_ok.put_i32(0);
write_all(stream, auth_ok).await
2022-02-03 13:35:40 -08:00
}
/// Generate md5 password challenge.
2022-06-27 09:46:33 -07:00
pub async fn md5_challenge<S>(stream: &mut S) -> Result<[u8; 4], Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
// let mut rng = rand::thread_rng();
let salt: [u8; 4] = [
rand::random(),
rand::random(),
rand::random(),
rand::random(),
];
let mut res = BytesMut::new();
res.put_u8(b'R');
res.put_i32(12);
res.put_i32(5); // MD5
res.put_slice(&salt[..]);
write_all(stream, res).await?;
Ok(salt)
}
2022-02-08 09:33:20 -08:00
/// Give the client the process_id and secret we generated
/// used in query cancellation.
2022-06-27 09:46:33 -07:00
pub async fn backend_key_data<S>(
stream: &mut S,
2022-02-04 09:28:52 -08:00
backend_id: i32,
secret_key: i32,
2022-06-27 09:46:33 -07:00
) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
2022-02-04 09:28:52 -08:00
let mut key_data = BytesMut::from(&b"K"[..]);
key_data.put_i32(12);
key_data.put_i32(backend_id);
key_data.put_i32(secret_key);
write_all(stream, key_data).await
2022-02-04 09:28:52 -08:00
}
/// Construct a `Q`: Query message.
pub fn simple_query(query: &str) -> BytesMut {
let mut res = BytesMut::from(&b"Q"[..]);
let query = format!("{}\0", query);
res.put_i32(query.len() as i32 + 4);
res.put_slice(query.as_bytes());
res
}
2022-02-08 09:33:20 -08:00
/// Tell the client we're ready for another query.
2022-06-27 09:46:33 -07:00
pub async fn ready_for_query<S>(stream: &mut S) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut bytes = BytesMut::with_capacity(
mem::size_of::<u8>() + mem::size_of::<i32>() + mem::size_of::<u8>(),
);
2022-02-03 13:35:40 -08:00
bytes.put_u8(b'Z');
bytes.put_i32(5);
bytes.put_u8(b'I'); // Idle
write_all(stream, bytes).await
2022-02-03 13:35:40 -08:00
}
2022-02-08 09:33:20 -08:00
/// Send the startup packet the server. We're pretending we're a Pg client.
/// This tells the server which user we are and what database we want.
pub async fn startup<S>(stream: &mut S, user: &str, database: &str) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
2022-02-03 15:17:04 -08:00
let mut bytes = BytesMut::with_capacity(25);
bytes.put_i32(196608); // Protocol number
// User
bytes.put(&b"user\0"[..]);
bytes.put_slice(user.as_bytes());
2022-02-03 15:17:04 -08:00
bytes.put_u8(0);
// Database
bytes.put(&b"database\0"[..]);
bytes.put_slice(database.as_bytes());
2022-02-03 15:17:04 -08:00
bytes.put_u8(0);
bytes.put_u8(0); // Null terminator
let len = bytes.len() as i32 + 4i32;
let mut startup = BytesMut::with_capacity(len as usize);
startup.put_i32(len);
startup.put(bytes);
match stream.write_all(&startup).await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing startup to server socket - Error: {:?}",
err
)))
}
2022-02-03 15:17:04 -08:00
}
}
pub async fn ssl_request(stream: &mut TcpStream) -> Result<(), Error> {
let mut bytes = BytesMut::with_capacity(12);
bytes.put_i32(8);
bytes.put_i32(80877103);
match stream.write_all(&bytes).await {
Ok(_) => Ok(()),
Err(err) => Err(Error::SocketError(format!(
"Error writing SSLRequest to server socket - Error: {:?}",
err
))),
}
}
/// Parse the params the server sends as a key/value format.
pub fn parse_params(mut bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
2022-02-14 05:11:53 -08:00
let mut result = HashMap::new();
let mut buf = Vec::new();
let mut tmp = String::new();
while bytes.has_remaining() {
let mut c = bytes.get_u8();
// Null-terminated C-strings.
while c != 0 {
tmp.push(c as char);
c = bytes.get_u8();
}
if !tmp.is_empty() {
2022-02-14 05:11:53 -08:00
buf.push(tmp.clone());
tmp.clear();
}
}
// Expect pairs of name and value
// and at least one pair to be present.
if buf.len() % 2 != 0 || buf.len() < 2 {
2022-02-14 05:11:53 -08:00
return Err(Error::ClientBadStartup);
}
let mut i = 0;
while i < buf.len() {
let name = buf[i].clone();
let value = buf[i + 1].clone();
let _ = result.insert(name, value);
i += 2;
}
Ok(result)
}
/// Parse StartupMessage parameters.
/// e.g. user, database, application_name, etc.
pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error> {
let result = parse_params(bytes)?;
2022-02-14 05:11:53 -08:00
// Minimum required parameters
// I want to have the user at the very minimum, according to the protocol spec.
if !result.contains_key("user") {
return Err(Error::ClientBadStartup);
}
Ok(result)
}
/// Create md5 password hash given a salt.
pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
2022-02-03 15:17:04 -08:00
let mut md5 = Md5::new();
// First pass
md5.update(&password.as_bytes());
md5.update(&user.as_bytes());
let output = md5.finalize_reset();
// Second pass
Auth passthrough (auth_query) (#266) * Add a new exec_simple_query method This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats. * Add auth passthough (auth_query) Adds a feature that allows setting auth passthrough for md5 auth. It adds 3 new (general and pool) config parameters: - `auth_query`: An string containing a query that will be executed on boot to obtain the hash of a given user. This query have to use a placeholder `$1`, so pgcat can replace it with the user its trying to fetch the hash from. - `auth_query_user`: The user to use for connecting to the server and executing the auth_query. - `auth_query_password`: The password to use for connecting to the server and executing the auth_query. The configuration can be done either on the general config (so pools share them) or in a per-pool basis. The behavior is, at boot time, when validating server connections, a hash is fetched per server and stored in the pool. When new server connections are created, and no cleartext password is specified, the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries it. When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no hash (we could not obtain one when validating the connection), a new fetch is tried. Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure we refetch the hash and retry auth (so password changes can be done). The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be obtained during connection validation, or the password has change, we can still connect later. * Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
md5_hash_second_pass(&(format!("{:x}", output)), salt)
}
pub fn md5_hash_second_pass(hash: &str, salt: &[u8]) -> Vec<u8> {
let mut md5 = Md5::new();
// Second pass
md5.update(hash);
2022-02-03 15:17:04 -08:00
md5.update(salt);
let mut password = format!("md5{:x}", md5.finalize())
.chars()
.map(|x| x as u8)
.collect::<Vec<u8>>();
password.push(0);
password
}
/// Send password challenge response to the server.
/// This is the MD5 challenge.
2022-06-27 09:46:33 -07:00
pub async fn md5_password<S>(
stream: &mut S,
user: &str,
password: &str,
salt: &[u8],
2022-06-27 09:46:33 -07:00
) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let password = md5_hash_password(user, password, salt);
2022-02-03 15:17:04 -08:00
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
Auth passthrough (auth_query) (#266) * Add a new exec_simple_query method This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats. * Add auth passthough (auth_query) Adds a feature that allows setting auth passthrough for md5 auth. It adds 3 new (general and pool) config parameters: - `auth_query`: An string containing a query that will be executed on boot to obtain the hash of a given user. This query have to use a placeholder `$1`, so pgcat can replace it with the user its trying to fetch the hash from. - `auth_query_user`: The user to use for connecting to the server and executing the auth_query. - `auth_query_password`: The password to use for connecting to the server and executing the auth_query. The configuration can be done either on the general config (so pools share them) or in a per-pool basis. The behavior is, at boot time, when validating server connections, a hash is fetched per server and stored in the pool. When new server connections are created, and no cleartext password is specified, the obtained hash is used for creating them, if the hash could not be obtained for whatever reason, it retries it. When client authentication is tried, it uses cleartext passwords if specified, it not, it checks whether we have query_auth set up, if so, it tries to use the obtained hash for making client auth. If there is no hash (we could not obtain one when validating the connection), a new fetch is tried. Once we have a hash, we authenticate using it against whathever the client has sent us, if there is a failure we refetch the hash and retry auth (so password changes can be done). The idea with this 'retrial' mechanism is to make it fault tolerant, so if for whatever reason hash could not be obtained during connection validation, or the password has change, we can still connect later. * Add documentation for Auth passthrough
2023-03-30 22:29:23 +02:00
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
message.put_slice(&password[..]);
write_all(stream, message).await
}
pub async fn md5_password_with_hash<S>(stream: &mut S, hash: &str, salt: &[u8]) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let password = md5_hash_second_pass(hash, salt);
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
2022-02-03 15:17:04 -08:00
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
message.put_slice(&password[..]);
write_all(stream, message).await
2022-02-03 15:17:04 -08:00
}
2022-02-09 20:02:20 -08:00
/// Implements a response to our custom `SET SHARDING KEY`
/// and `SET SERVER ROLE` commands.
2022-02-09 06:51:31 -08:00
/// This tells the client we're ready for the next query.
2022-06-27 15:52:01 -07:00
pub async fn custom_protocol_response_ok<S>(stream: &mut S, message: &str) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
2022-02-08 13:11:50 -08:00
let mut res = BytesMut::with_capacity(25);
2022-02-09 20:02:20 -08:00
let set_complete = BytesMut::from(&format!("{}\0", message)[..]);
2022-02-08 13:11:50 -08:00
let len = (set_complete.len() + 4) as i32;
2022-02-09 06:51:31 -08:00
// CommandComplete
2022-02-08 13:11:50 -08:00
res.put_u8(b'C');
res.put_i32(len);
res.put_slice(&set_complete[..]);
write_all_half(stream, &res).await?;
ready_for_query(stream).await
2022-02-08 13:11:50 -08:00
}
/// Send a custom error message to the client.
/// Tell the client we are ready for the next query and no rollback is necessary.
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
2022-06-27 09:46:33 -07:00
pub async fn error_response<S>(stream: &mut S, message: &str) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
error_response_terminal(stream, message).await?;
ready_for_query(stream).await
}
/// Send a custom error message to the client.
/// Tell the client we are ready for the next query and no rollback is necessary.
/// Docs on error codes: <https://www.postgresql.org/docs/12/errcodes-appendix.html>.
pub async fn error_response_terminal<S>(stream: &mut S, message: &str) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut error = BytesMut::new();
// Error level
error.put_u8(b'S');
error.put_slice(&b"FATAL\0"[..]);
// Error level (non-translatable)
error.put_u8(b'V');
error.put_slice(&b"FATAL\0"[..]);
// Error code: not sure how much this matters.
error.put_u8(b'C');
error.put_slice(&b"58000\0"[..]); // system_error, see Appendix A.
// The short error message.
error.put_u8(b'M');
error.put_slice(format!("{}\0", message).as_bytes());
// No more fields follow.
error.put_u8(0);
// Compose the two message reply.
let mut res = BytesMut::with_capacity(error.len() + 5);
res.put_u8(b'E');
res.put_i32(error.len() as i32 + 4);
res.put(error);
write_all_half(stream, &res).await
}
2022-06-27 09:46:33 -07:00
pub async fn wrong_password<S>(stream: &mut S, user: &str) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut error = BytesMut::new();
// Error level
error.put_u8(b'S');
error.put_slice(&b"FATAL\0"[..]);
// Error level (non-translatable)
error.put_u8(b'V');
error.put_slice(&b"FATAL\0"[..]);
// Error code: not sure how much this matters.
error.put_u8(b'C');
error.put_slice(&b"28P01\0"[..]); // system_error, see Appendix A.
// The short error message.
error.put_u8(b'M');
error.put_slice(format!("password authentication failed for user \"{}\"\0", user).as_bytes());
// No more fields follow.
error.put_u8(0);
// Compose the two message reply.
let mut res = BytesMut::new();
res.put_u8(b'E');
res.put_i32(error.len() as i32 + 4);
res.put(error);
write_all(stream, res).await
}
/// Respond to a SHOW SHARD command.
2022-06-27 15:52:01 -07:00
pub async fn show_response<S>(stream: &mut S, name: &str, value: &str) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
// A SELECT response consists of:
// 1. RowDescription
// 2. One or more DataRow
// 3. CommandComplete
// 4. ReadyForQuery
2022-03-01 08:47:19 -08:00
// The final messages sent to the client
let mut res = BytesMut::new();
2022-03-01 08:47:19 -08:00
// RowDescription
res.put(row_description(&vec![(name, DataType::Text)]));
2022-03-01 08:47:19 -08:00
// DataRow
res.put(data_row(&vec![value.to_string()]));
2022-03-01 08:47:19 -08:00
// CommandComplete
res.put(command_complete("SELECT 1"));
write_all_half(stream, &res).await?;
ready_for_query(stream).await
2022-03-01 08:47:19 -08:00
}
2022-03-01 08:47:19 -08:00
pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut {
let mut res = BytesMut::new();
let mut row_desc = BytesMut::new();
// how many columns we are storing
2022-03-01 08:47:19 -08:00
row_desc.put_i16(columns.len() as i16);
2022-03-01 08:47:19 -08:00
for (name, data_type) in columns {
// Column name
row_desc.put_slice(format!("{}\0", name).as_bytes());
2022-03-01 08:47:19 -08:00
// Doesn't belong to any table
row_desc.put_i32(0);
2022-03-01 08:47:19 -08:00
// Doesn't belong to any table
row_desc.put_i16(0);
2022-03-01 08:47:19 -08:00
// Text
row_desc.put_i32(data_type.into());
2022-03-01 08:47:19 -08:00
// Text size = variable (-1)
let type_size = match data_type {
DataType::Text => -1,
DataType::Int4 => 4,
DataType::Numeric => -1,
DataType::Bool => 1,
DataType::Oid => 4,
DataType::AnyArray => -1,
DataType::Any => -1,
2022-03-01 08:47:19 -08:00
};
2022-03-01 08:47:19 -08:00
row_desc.put_i16(type_size);
2022-03-01 08:47:19 -08:00
// Type modifier: none that I know
row_desc.put_i32(-1);
2022-03-01 08:47:19 -08:00
// Format being used: text (0), binary (1)
row_desc.put_i16(0);
}
res.put_u8(b'T');
res.put_i32(row_desc.len() as i32 + 4);
res.put(row_desc);
2022-03-01 08:47:19 -08:00
res
}
/// Create a DataRow message.
2022-03-01 08:47:19 -08:00
pub fn data_row(row: &Vec<String>) -> BytesMut {
let mut res = BytesMut::new();
let mut data_row = BytesMut::new();
data_row.put_i16(row.len() as i16);
for column in row {
let column = column.as_bytes();
data_row.put_i32(column.len() as i32);
data_row.put_slice(column);
2022-03-01 08:47:19 -08:00
}
res.put_u8(b'D');
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);
2022-03-01 08:47:19 -08:00
res
}
pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
let mut res = BytesMut::new();
let mut data_row = BytesMut::new();
data_row.put_i16(row.len() as i16);
for column in row {
if let Some(column) = column {
let column = column.as_bytes();
data_row.put_i32(column.len() as i32);
data_row.put_slice(column);
} else {
data_row.put_i32(-1 as i32);
}
}
res.put_u8(b'D');
res.put_i32(data_row.len() as i32 + 4);
res.put(data_row);
res
}
/// Create a CommandComplete message.
2022-03-01 08:47:19 -08:00
pub fn command_complete(command: &str) -> BytesMut {
let cmd = BytesMut::from(format!("{}\0", command).as_bytes());
let mut res = BytesMut::new();
res.put_u8(b'C');
res.put_i32(cmd.len() as i32 + 4);
res.put(cmd);
res
}
2022-02-08 09:33:20 -08:00
/// Write all data in the buffer to the TcpStream.
2022-06-27 09:46:33 -07:00
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
2022-02-03 13:35:40 -08:00
match stream.write_all(&buf).await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
2022-02-03 13:35:40 -08:00
}
2022-02-03 13:54:07 -08:00
}
2022-02-08 09:33:20 -08:00
/// Write all the data in the buffer to the TcpStream, write owned half (see mpsc).
pub async fn write_all_half<S>(stream: &mut S, buf: &BytesMut) -> Result<(), Error>
2022-06-27 15:52:01 -07:00
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
match stream.write_all(buf).await {
2022-02-03 15:17:04 -08:00
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
2022-02-03 15:17:04 -08:00
}
}
pub async fn write_all_flush<S>(stream: &mut S, buf: &[u8]) -> Result<(), Error>
where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
match stream.write_all(buf).await {
Ok(_) => match stream.flush().await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error flushing socket - Error: {:?}",
err
)))
}
},
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
}
}
2022-02-03 13:54:07 -08:00
/// Read a complete message from the socket.
2022-06-27 15:52:01 -07:00
pub async fn read_message<S>(stream: &mut S) -> Result<BytesMut, Error>
where
S: tokio::io::AsyncRead + std::marker::Unpin,
{
2022-02-03 13:54:07 -08:00
let code = match stream.read_u8().await {
Ok(code) => code,
Err(err) => {
return Err(Error::SocketError(format!(
"Error reading message code from socket - Error {:?}",
err
)))
}
2022-02-03 13:54:07 -08:00
};
let len = match stream.read_i32().await {
Ok(len) => len,
Err(err) => {
return Err(Error::SocketError(format!(
"Error reading message len from socket - Code: {:?}, Error: {:?}",
code, err
)))
}
2022-02-03 13:54:07 -08:00
};
let mut bytes = BytesMut::with_capacity(len as usize + 1);
bytes.put_u8(code);
bytes.put_i32(len);
bytes.resize(bytes.len() + len as usize - mem::size_of::<i32>(), b'0');
2022-02-03 13:54:07 -08:00
let slice_start = mem::size_of::<u8>() + mem::size_of::<i32>();
let slice_end = slice_start + len as usize - mem::size_of::<i32>();
// Avoids a panic
if slice_end < slice_start {
return Err(Error::SocketError(format!(
"Error reading message from socket - Code: {:?} - Length {:?}, Error: {:?}",
code, len, "Unexpected length value for message"
)));
}
match stream.read_exact(&mut bytes[slice_start..slice_end]).await {
2022-02-03 13:54:07 -08:00
Ok(_) => (),
Err(err) => {
return Err(Error::SocketError(format!(
"Error reading message from socket - Code: {:?}, Error: {:?}",
code, err
)))
}
2022-02-03 13:54:07 -08:00
};
Ok(bytes)
2022-02-03 15:17:04 -08:00
}
pub fn server_parameter_message(key: &str, value: &str) -> BytesMut {
let mut server_info = BytesMut::new();
let null_byte_size = 1;
let len: usize =
mem::size_of::<i32>() + key.len() + null_byte_size + value.len() + null_byte_size;
server_info.put_slice("S".as_bytes());
server_info.put_i32(len.try_into().unwrap());
server_info.put_slice(key.as_bytes());
server_info.put_bytes(0, 1);
server_info.put_slice(value.as_bytes());
server_info.put_bytes(0, 1);
server_info
}
pub fn configure_socket(stream: &TcpStream) {
let sock_ref = SockRef::from(stream);
let conf = get_config();
match sock_ref.set_keepalive(true) {
Ok(_) => {
match sock_ref.set_tcp_keepalive(
&TcpKeepalive::new()
.with_interval(Duration::from_secs(conf.general.tcp_keepalives_interval))
.with_retries(conf.general.tcp_keepalives_count)
.with_time(Duration::from_secs(conf.general.tcp_keepalives_idle)),
) {
Ok(_) => (),
Err(err) => error!("Could not configure socket: {}", err),
}
}
Err(err) => error!("Could not configure socket: {}", err),
}
}
pub trait BytesMutReader {
fn read_string(&mut self) -> Result<String, Error>;
}
impl BytesMutReader for Cursor<&BytesMut> {
/// Should only be used when reading strings from the message protocol.
/// Can be used to read multiple strings from the same message which are separated by the null byte
fn read_string(&mut self) -> Result<String, Error> {
let mut buf = vec![];
match self.read_until(b'\0', &mut buf) {
Ok(_) => Ok(String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string()),
Err(err) => return Err(Error::ParseBytesError(err.to_string())),
}
}
}