PgCat Query Mirroring (#341)

This is an implementation of Query mirroring in PgCat (outlined here #302)

In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with.

This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded.

We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow.
This commit is contained in:
Mostafa Abdelraouf
2023-03-10 06:23:51 -06:00
committed by GitHub
parent c0855bf27d
commit aa89e357e0
17 changed files with 370 additions and 23 deletions

View File

@@ -1,4 +1,3 @@
use crate::config::Role;
use crate::pool::BanReason;
/// Admin database.
use bytes::{Buf, BufMut, BytesMut};

View File

@@ -29,6 +29,8 @@ pub enum Role {
Primary,
#[serde(alias = "replica", alias = "Replica")]
Replica,
#[serde(alias = "mirror", alias = "Mirror")]
Mirror,
}
impl ToString for Role {
@@ -36,6 +38,7 @@ impl ToString for Role {
match *self {
Role::Primary => "primary".to_string(),
Role::Replica => "replica".to_string(),
Role::Mirror => "mirror".to_string(),
}
}
}
@@ -90,6 +93,9 @@ pub struct Address {
/// The name of this pool (i.e. database name visible to the client).
pub pool_name: String,
/// List of addresses to receive mirrored traffic.
pub mirrors: Vec<Address>,
}
impl Default for Address {
@@ -105,6 +111,7 @@ impl Default for Address {
role: Role::Replica,
username: String::from("username"),
pool_name: String::from("pool_name"),
mirrors: Vec::new(),
}
}
}
@@ -114,11 +121,14 @@ impl Address {
pub fn name(&self) -> String {
match self.role {
Role::Primary => format!("{}_shard_{}_primary", self.pool_name, self.shard),
Role::Replica => format!(
"{}_shard_{}_replica_{}",
self.pool_name, self.shard, self.replica_number
),
Role::Mirror => format!(
"{}_shard_{}_mirror_{}",
self.pool_name, self.shard, self.replica_number
),
}
}
}
@@ -465,11 +475,19 @@ pub struct ServerConfig {
pub role: Role,
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
pub struct MirrorServerConfig {
pub host: String,
pub port: u16,
pub mirroring_target_index: usize,
}
/// Shard configuration.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)]
pub struct Shard {
pub database: String,
pub servers: Vec<ServerConfig>,
pub mirrors: Option<Vec<MirrorServerConfig>>,
}
impl Shard {
@@ -518,6 +536,7 @@ impl Default for Shard {
port: 5432,
role: Role::Primary,
}],
mirrors: None,
database: String::from("postgres"),
}
}

View File

@@ -2,6 +2,7 @@ pub mod config;
pub mod constants;
pub mod errors;
pub mod messages;
pub mod mirrors;
pub mod multi_logger;
pub mod pool;
pub mod scram;

View File

@@ -66,6 +66,7 @@ mod config;
mod constants;
mod errors;
mod messages;
mod mirrors;
mod multi_logger;
mod pool;
mod prometheus;

169
src/mirrors.rs Normal file
View File

@@ -0,0 +1,169 @@
/// A mirrored PostgreSQL client.
/// Packets arrive to us through a channel from the main client and we send them to the server.
use bb8::Pool;
use bytes::{Bytes, BytesMut};
use crate::config::{get_config, Address, Role, User};
use crate::pool::{ClientServerMap, ServerPool};
use crate::stats::get_reporter;
use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
pub struct MirroredClient {
address: Address,
user: User,
database: String,
bytes_rx: Receiver<Bytes>,
disconnect_rx: Receiver<()>,
}
impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
cfg.idle_timeout.unwrap_or(default),
),
None => (default, default),
};
let manager = ServerPool::new(
self.address.clone(),
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
get_reporter(),
);
Pool::builder()
.max_size(1)
.connection_timeout(std::time::Duration::from_millis(connection_timeout))
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
.test_on_check_out(false)
.build(manager)
.await
.unwrap()
}
pub fn start(mut self) {
tokio::spawn(async move {
let pool = self.create_pool().await;
let address = self.address.clone();
loop {
let mut server = match pool.get().await {
Ok(server) => server,
Err(err) => {
error!(
"Failed to get connection from pool, Discarding message {:?}, {:?}",
err,
address.clone()
);
continue;
}
};
tokio::select! {
// Exit channel events
_ = self.disconnect_rx.recv() => {
info!("Got mirror exit signal, exiting {:?}", address.clone());
break;
}
// Incoming data from server (we read to clear the socket buffer and discard the data)
recv_result = server.recv() => {
match recv_result {
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
Err(err) => {
server.mark_bad();
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
}
}
}
// Messages to send to the server
message = self.bytes_rx.recv() => {
match message {
Some(bytes) => {
match server.send(&BytesMut::from(&bytes[..])).await {
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
Err(err) => {
server.mark_bad();
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
}
}
}
None => {
info!("Mirror channel closed, exiting {:?}", address.clone());
break;
},
}
}
}
}
});
}
}
pub struct MirroringManager {
pub byte_senders: Vec<Sender<Bytes>>,
pub disconnect_senders: Vec<Sender<()>>,
}
impl MirroringManager {
pub fn from_addresses(
user: User,
database: String,
addresses: Vec<Address>,
) -> MirroringManager {
let mut byte_senders: Vec<Sender<Bytes>> = vec![];
let mut exit_senders: Vec<Sender<()>> = vec![];
addresses.iter().for_each(|mirror| {
let (bytes_tx, bytes_rx) = channel::<Bytes>(500);
let (exit_tx, exit_rx) = channel::<()>(1);
let mut addr = mirror.clone();
addr.role = Role::Mirror;
let client = MirroredClient {
user: user.clone(),
database: database.to_owned(),
address: addr,
bytes_rx,
disconnect_rx: exit_rx,
};
exit_senders.push(exit_tx.clone());
byte_senders.push(bytes_tx.clone());
client.start();
});
Self {
byte_senders: byte_senders,
disconnect_senders: exit_senders,
}
}
pub fn send(self: &mut Self, bytes: &BytesMut) {
let cpy = bytes.clone().freeze();
self.byte_senders
.iter_mut()
.for_each(|sender| match sender.try_send(cpy.clone()) {
Ok(_) => {}
Err(err) => {
warn!("Failed to send bytes to a mirror channel {}", err);
}
});
}
pub fn disconnect(self: &mut Self) {
self.disconnect_senders
.iter_mut()
.for_each(|sender| match sender.try_send(()) {
Ok(_) => {}
Err(err) => {
warn!(
"Failed to send disconnect signal to a mirror channel {}",
err
);
}
});
}
}

View File

@@ -193,7 +193,7 @@ impl ConnectionPool {
let config = get_config();
let mut new_pools = HashMap::new();
let mut address_id = 0;
let mut address_id: usize = 0;
for (pool_name, pool_config) in &config.pools {
let new_pool_hash_value = pool_config.hash_value();
@@ -244,7 +244,33 @@ impl ConnectionPool {
let mut servers = Vec::new();
let mut replica_number = 0;
// Load Mirror settings
for (address_index, server) in shard.servers.iter().enumerate() {
let mut mirror_addresses = vec![];
if let Some(mirror_settings_vec) = &shard.mirrors {
for (mirror_idx, mirror_settings) in
mirror_settings_vec.iter().enumerate()
{
if mirror_settings.mirroring_target_index != address_index {
continue;
}
mirror_addresses.push(Address {
id: address_id,
database: shard.database.clone(),
host: mirror_settings.host.clone(),
port: mirror_settings.port,
role: server.role,
address_index: mirror_idx,
replica_number,
shard: shard_idx.parse::<usize>().unwrap(),
username: user.username.clone(),
pool_name: pool_name.clone(),
mirrors: vec![],
});
address_id += 1;
}
}
let address = Address {
id: address_id,
database: shard.database.clone(),
@@ -256,6 +282,7 @@ impl ConnectionPool {
shard: shard_idx.parse::<usize>().unwrap(),
username: user.username.clone(),
pool_name: pool_name.clone(),
mirrors: mirror_addresses,
};
address_id += 1;

View File

@@ -223,6 +223,7 @@ impl QueryRouter {
Command::ShowServerRole => match self.active_role {
Some(Role::Primary) => Role::Primary.to_string(),
Some(Role::Replica) => Role::Replica.to_string(),
Some(Role::Mirror) => Role::Mirror.to_string(),
None => {
if self.query_parser_enabled() {
String::from("auto")

View File

@@ -14,6 +14,7 @@ use crate::config::{Address, User};
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::mirrors::MirroringManager;
use crate::pool::ClientServerMap;
use crate::scram::ScramSha256;
use crate::stats::Reporter;
@@ -68,6 +69,8 @@ pub struct Server {
// Last time that a successful server send or response happened
last_activity: SystemTime,
mirror_manager: Option<MirroringManager>,
}
impl Server {
@@ -334,6 +337,14 @@ impl Server {
stats,
application_name: String::new(),
last_activity: SystemTime::now(),
mirror_manager: match address.mirrors.len() {
0 => None,
_ => Some(MirroringManager::from_addresses(
user.clone(),
database.to_owned(),
address.mirrors.clone(),
)),
},
};
server.set_name("pgcat").await?;
@@ -384,6 +395,7 @@ impl Server {
/// Send messages to the server from the client.
pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> {
self.mirror_send(messages);
self.stats.data_sent(messages.len(), self.server_id);
match write_all_half(&mut self.write, messages).await {
@@ -674,6 +686,20 @@ impl Server {
pub fn mark_dirty(&mut self) {
self.needs_cleanup = true;
}
pub fn mirror_send(&mut self, bytes: &BytesMut) {
match self.mirror_manager.as_mut() {
Some(manager) => manager.send(bytes),
None => (),
}
}
pub fn mirror_disconnect(&mut self) {
match self.mirror_manager.as_mut() {
Some(manager) => manager.disconnect(),
None => (),
}
}
}
impl Drop for Server {
@@ -681,6 +707,7 @@ impl Drop for Server {
/// the socket is in non-blocking mode, so it may not be ready
/// for a write.
fn drop(&mut self) {
self.mirror_disconnect();
self.stats.server_disconnecting(self.server_id);
let mut bytes = BytesMut::with_capacity(4);