Health check delay (#118)

* initial commit of server check delay implementation

* fmt

* spelling

* Update name to last_healthcheck and some comments

* Moved server tested stat to after require_healthcheck check

* Make health check delay configurable

* Rename to last_activity

* Fix typo

* Add debug log for healthcheck

* Add address to debug log
This commit is contained in:
zainkabani
2022-08-11 17:42:40 -04:00
committed by GitHub
parent a262337ba5
commit f963b12821
8 changed files with 165 additions and 41 deletions

View File

@@ -20,6 +20,9 @@ connect_timeout = 100
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 100
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000

View File

@@ -48,6 +48,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| | | |
| **`user`** | | |
@@ -252,6 +253,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `shutdown_timeout` | no |
| `healthcheck_delay` | no |
| `ban_time` | no |
| `user` | yes |
| `shards` | yes |

View File

@@ -20,6 +20,9 @@ connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

View File

@@ -20,6 +20,9 @@ connect_timeout = 5000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

View File

@@ -7,11 +7,11 @@ use tokio::net::TcpStream;
use tokio::sync::broadcast::Receiver;
use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::config::get_config;
use crate::config::{get_config, Address};
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::pool::{get_pool, ClientServerMap};
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter};
use crate::server::Server;
use crate::stats::{get_reporter, Reporter};
@@ -246,7 +246,7 @@ where
}
}
/// Handle TLS connection negotation.
/// Handle TLS connection negotiation.
pub async fn startup_tls(
stream: TcpStream,
client_server_map: ClientServerMap,
@@ -259,14 +259,14 @@ pub async fn startup_tls(
let mut stream = match tls.acceptor.accept(stream).await {
Ok(stream) => stream,
// TLS negotitation failed.
// TLS negotiation failed.
Err(err) => {
error!("TLS negotiation failed: {:?}", err);
return Err(Error::TlsError);
}
};
// TLS negotitation successful.
// TLS negotiation successful.
// Continue with regular startup using encrypted connection.
match get_startup::<TlsStream<TcpStream>>(&mut stream).await {
// Got good startup message, proceeding like normal except we
@@ -540,21 +540,21 @@ where
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool =
match get_pool(self.target_pool_name.clone(), self.target_user_name.clone()) {
Some(pool) => pool,
None => {
error_response(
&mut self.write,
&format!(
"No pool configured for database: {:?}, user: {:?}",
self.target_pool_name, self.target_user_name
),
)
.await?;
return Err(Error::ClientError);
}
};
let pool = match get_pool(self.target_pool_name.clone(), self.target_user_name.clone())
{
Some(pool) => pool,
None => {
error_response(
&mut self.write,
&format!(
"No pool configured for database: {:?}, user: {:?}",
self.target_pool_name, self.target_user_name
),
)
.await?;
return Err(Error::ClientError);
}
};
query_router.update_pool_settings(pool.settings.clone());
let current_shard = query_router.shard();
@@ -731,12 +731,26 @@ where
'Q' => {
debug!("Sending query to server");
server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = server.recv().await?;
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
@@ -816,14 +830,28 @@ where
self.buffer.put(&original[..]);
server.send(self.buffer.clone()).await?;
self.send_server_message(
server,
self.buffer.clone(),
&address,
query_router.shard(),
&pool,
)
.await?;
self.buffer.clear();
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = server.recv().await?;
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
@@ -857,15 +885,31 @@ where
'd' => {
// Forward the data to the server,
// don't buffer it since it can be rather large.
server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;
}
// CopyDone or CopyFail
// Copy is done, successfully or not.
'c' | 'f' => {
server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;
let response = server.recv().await?;
let response = self
.receive_server_message(server, &address, query_router.shard(), &pool)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
@@ -907,6 +951,39 @@ where
let mut guard = self.client_server_map.lock();
guard.remove(&(self.process_id, self.secret_key));
}
async fn send_server_message(
&self,
server: &mut Server,
message: BytesMut,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
match server.send(message).await {
Ok(_) => Ok(()),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
}
}
}
async fn receive_server_message(
&self,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<BytesMut, Error> {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
}
}
}
}
impl<S, T> Drop for Client<S, T> {

View File

@@ -121,6 +121,7 @@ pub struct General {
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub shutdown_timeout: u64,
pub healthcheck_delay: u64,
pub ban_time: i64,
pub autoreload: bool,
pub tls_certificate: Option<String>,
@@ -138,6 +139,7 @@ impl Default for General {
connect_timeout: 5000,
healthcheck_timeout: 1000,
shutdown_timeout: 60000,
healthcheck_delay: 30000,
ban_time: 60,
autoreload: false,
tls_certificate: None,
@@ -281,6 +283,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
"shutdown_timeout".to_string(),
config.general.shutdown_timeout.to_string(),
),
(
"healthcheck_delay".to_string(),
config.general.healthcheck_delay.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
];
@@ -299,6 +305,7 @@ impl Config {
);
info!("Connection timeout: {}ms", self.general.connect_timeout);
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);

View File

@@ -251,7 +251,7 @@ impl ConnectionPool {
/// Get a connection from the pool.
pub async fn get(
&mut self,
&self,
shard: usize, // shard number
role: Option<Role>, // primary or replica
process_id: i32, // client id
@@ -283,6 +283,9 @@ impl ConnectionPool {
return Err(Error::BadConfig);
}
let healthcheck_timeout = get_config().general.healthcheck_timeout;
let healthcheck_delay = get_config().general.healthcheck_delay as u128;
while allowed_attempts > 0 {
// Round-robin replicas.
round_robin += 1;
@@ -312,7 +315,7 @@ impl ConnectionPool {
Ok(conn) => conn,
Err(err) => {
error!("Banning replica {}, error: {:?}", index, err);
self.ban(address, shard);
self.ban(address, shard, process_id);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
@@ -322,8 +325,19 @@ impl ConnectionPool {
// // Check if this server is alive with a health check.
let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout;
// Will return error if timestamp is greater than current system time, which it should never be set to
let require_healthcheck =
server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay;
if !require_healthcheck {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}
debug!("Running health check for replica {}, {:?}", index, address);
self.stats.server_tested(server.process_id(), address.id);
match tokio::time::timeout(
@@ -348,10 +362,7 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool.
server.mark_bad();
self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.ban(address, shard, process_id);
continue;
}
},
@@ -362,10 +373,7 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool.
server.mark_bad();
self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.ban(address, shard, process_id);
continue;
}
}
@@ -377,7 +385,11 @@ impl ConnectionPool {
/// Ban an address (i.e. replica). It no longer will serve
/// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address, shard: usize) {
pub fn ban(&self, address: &Address, shard: usize, process_id: i32) {
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id);
error!("Banning {:?}", address);
let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.write();

View File

@@ -2,6 +2,7 @@
/// Here we are pretending to the a Postgres client.
use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, info, trace};
use std::time::SystemTime;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
@@ -58,6 +59,9 @@ pub struct Server {
/// Application name using the server at the moment.
application_name: String,
// Last time that a successful server send or response happened
last_activity: SystemTime,
}
impl Server {
@@ -316,6 +320,7 @@ impl Server {
connected_at: chrono::offset::Utc::now().naive_utc(),
stats: stats,
application_name: String::new(),
last_activity: SystemTime::now(),
};
server.set_name("pgcat").await?;
@@ -366,7 +371,11 @@ impl Server {
.data_sent(messages.len(), self.process_id, self.address.id);
match write_all_half(&mut self.write, messages).await {
Ok(_) => Ok(()),
Ok(_) => {
// Successfully sent to server
self.last_activity = SystemTime::now();
Ok(())
}
Err(err) => {
error!("Terminating server because of: {:?}", err);
self.bad = true;
@@ -413,7 +422,7 @@ impl Server {
self.in_transaction = false;
}
// Some error occured, the transaction was rolled back.
// Some error occurred, the transaction was rolled back.
'E' => {
self.in_transaction = true;
}
@@ -474,6 +483,9 @@ impl Server {
// Clear the buffer for next query.
self.buffer.clear();
// Successfully received data from server
self.last_activity = SystemTime::now();
// Pass the data back to the client.
Ok(bytes)
}
@@ -564,6 +576,11 @@ impl Server {
pub fn process_id(&self) -> i32 {
self.process_id
}
// Get server's latest response timestamp
pub fn last_activity(&self) -> SystemTime {
self.last_activity
}
}
impl Drop for Server {