2022-06-24 14:52:38 -07:00
|
|
|
use arc_swap::ArcSwap;
|
2022-02-03 16:25:05 -08:00
|
|
|
use async_trait::async_trait;
|
2022-02-05 18:20:53 -08:00
|
|
|
use bb8::{ManageConnection, Pool, PooledConnection};
|
2022-02-11 22:19:49 -08:00
|
|
|
use bytes::BytesMut;
|
2022-02-05 10:02:13 -08:00
|
|
|
use chrono::naive::NaiveDateTime;
|
2022-02-24 08:44:41 -08:00
|
|
|
use log::{debug, error, info, warn};
|
2022-06-24 14:52:38 -07:00
|
|
|
use once_cell::sync::Lazy;
|
2022-02-24 08:44:41 -08:00
|
|
|
use parking_lot::{Mutex, RwLock};
|
2022-03-10 01:33:29 -08:00
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Instant;
|
2022-02-03 16:25:05 -08:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
use crate::config::{get_config, Address, Role, Shard, User};
|
2022-02-03 16:25:05 -08:00
|
|
|
use crate::errors::Error;
|
2022-07-27 21:47:55 -05:00
|
|
|
|
2022-02-03 17:06:19 -08:00
|
|
|
use crate::server::Server;
|
2022-06-24 14:52:38 -07:00
|
|
|
use crate::stats::{get_reporter, Reporter};
|
2022-02-05 10:02:13 -08:00
|
|
|
|
2022-02-24 08:44:41 -08:00
|
|
|
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
|
2022-02-05 10:02:13 -08:00
|
|
|
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
|
2022-07-27 21:47:55 -05:00
|
|
|
pub type PoolMap = HashMap<(String, String), ConnectionPool>;
|
2022-06-24 14:52:38 -07:00
|
|
|
/// The connection pool, globally available.
|
|
|
|
|
/// This is atomic and safe and read-optimized.
|
|
|
|
|
/// The pool is recreated dynamically when the config is reloaded.
|
2022-07-27 21:47:55 -05:00
|
|
|
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
|
pub struct PoolSettings {
|
|
|
|
|
pub pool_mode: String,
|
|
|
|
|
pub shards: HashMap<String, Shard>,
|
|
|
|
|
pub user: User,
|
|
|
|
|
pub default_role: String,
|
|
|
|
|
pub query_parser_enabled: bool,
|
|
|
|
|
pub primary_reads_enabled: bool,
|
|
|
|
|
pub sharding_function: String,
|
|
|
|
|
}
|
|
|
|
|
impl Default for PoolSettings {
|
|
|
|
|
fn default() -> PoolSettings {
|
|
|
|
|
PoolSettings {
|
|
|
|
|
pool_mode: String::from("transaction"),
|
|
|
|
|
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
|
|
|
|
user: User::default(),
|
|
|
|
|
default_role: String::from("any"),
|
|
|
|
|
query_parser_enabled: false,
|
|
|
|
|
primary_reads_enabled: true,
|
|
|
|
|
sharding_function: "pg_bigint_hash".to_string(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-06-24 14:52:38 -07:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// The globally accessible connection pool.
|
2022-06-24 14:52:38 -07:00
|
|
|
#[derive(Clone, Debug, Default)]
|
2022-02-05 18:20:53 -08:00
|
|
|
pub struct ConnectionPool {
|
2022-06-24 14:52:38 -07:00
|
|
|
/// The pools handled internally by bb8.
|
2022-02-05 19:43:48 -08:00
|
|
|
databases: Vec<Vec<Pool<ServerPool>>>,
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
/// The addresses (host, port, role) to handle
|
|
|
|
|
/// failover and load balancing deterministically.
|
2022-02-05 19:43:48 -08:00
|
|
|
addresses: Vec<Vec<Address>>,
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
/// List of banned addresses (see above)
|
|
|
|
|
/// that should not be queried.
|
2022-02-05 18:20:53 -08:00
|
|
|
banlist: BanList,
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
/// The statistics aggregator runs in a separate task
|
|
|
|
|
/// and receives stats from clients, servers, and the pool.
|
2022-02-14 10:00:55 -08:00
|
|
|
stats: Reporter,
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
/// The server information (K messages) have to be passed to the
|
|
|
|
|
/// clients on startup. We pre-connect to all shards and replicas
|
|
|
|
|
/// on pool creation and save the K messages here.
|
|
|
|
|
server_info: BytesMut,
|
2022-07-27 21:47:55 -05:00
|
|
|
|
|
|
|
|
pub settings: PoolSettings,
|
2022-02-03 16:25:05 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-05 18:20:53 -08:00
|
|
|
impl ConnectionPool {
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Construct the connection pool from the configuration.
|
2022-06-24 14:52:38 -07:00
|
|
|
pub async fn from_config(client_server_map: ClientServerMap) -> Result<(), Error> {
|
2022-02-19 13:57:35 -08:00
|
|
|
let config = get_config();
|
2022-07-27 21:47:55 -05:00
|
|
|
let mut new_pools = PoolMap::default();
|
2022-06-24 14:52:38 -07:00
|
|
|
|
2022-03-04 17:04:27 -08:00
|
|
|
let mut address_id = 0;
|
2022-07-27 21:47:55 -05:00
|
|
|
for (pool_name, pool_config) in &config.pools {
|
|
|
|
|
for (_user_index, user_info) in &pool_config.users {
|
|
|
|
|
let mut shards = Vec::new();
|
|
|
|
|
let mut addresses = Vec::new();
|
|
|
|
|
let mut banlist = Vec::new();
|
|
|
|
|
let mut shard_ids = pool_config
|
|
|
|
|
.shards
|
|
|
|
|
.clone()
|
|
|
|
|
.into_keys()
|
|
|
|
|
.map(|x| x.to_string())
|
|
|
|
|
.collect::<Vec<String>>();
|
|
|
|
|
|
|
|
|
|
// Sort by shard number to ensure consistency.
|
|
|
|
|
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
|
|
|
|
|
|
|
|
|
|
for shard_idx in shard_ids {
|
|
|
|
|
let shard = &pool_config.shards[&shard_idx];
|
|
|
|
|
let mut pools = Vec::new();
|
|
|
|
|
let mut servers = Vec::new();
|
|
|
|
|
let mut replica_number = 0;
|
|
|
|
|
|
|
|
|
|
for server in shard.servers.iter() {
|
|
|
|
|
let role = match server.2.as_ref() {
|
|
|
|
|
"primary" => Role::Primary,
|
|
|
|
|
"replica" => Role::Replica,
|
|
|
|
|
_ => {
|
|
|
|
|
error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2);
|
|
|
|
|
Role::Replica
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let address = Address {
|
|
|
|
|
id: address_id,
|
2022-08-17 10:40:47 -05:00
|
|
|
database: shard.database.clone(),
|
2022-07-27 21:47:55 -05:00
|
|
|
host: server.0.clone(),
|
|
|
|
|
port: server.1.to_string(),
|
|
|
|
|
role: role,
|
|
|
|
|
replica_number,
|
|
|
|
|
shard: shard_idx.parse::<usize>().unwrap(),
|
2022-08-17 10:40:47 -05:00
|
|
|
username: user_info.username.clone(),
|
|
|
|
|
poolname: pool_name.clone(),
|
2022-07-27 21:47:55 -05:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
address_id += 1;
|
|
|
|
|
|
|
|
|
|
if role == Role::Replica {
|
|
|
|
|
replica_number += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let manager = ServerPool::new(
|
|
|
|
|
address.clone(),
|
|
|
|
|
user_info.clone(),
|
|
|
|
|
&shard.database,
|
|
|
|
|
client_server_map.clone(),
|
|
|
|
|
get_reporter(),
|
|
|
|
|
);
|
2022-02-08 09:25:59 -08:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
let pool = Pool::builder()
|
|
|
|
|
.max_size(user_info.pool_size)
|
|
|
|
|
.connection_timeout(std::time::Duration::from_millis(
|
|
|
|
|
config.general.connect_timeout,
|
|
|
|
|
))
|
|
|
|
|
.test_on_check_out(false)
|
|
|
|
|
.build(manager)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
pools.push(pool);
|
|
|
|
|
servers.push(address);
|
|
|
|
|
}
|
2022-03-04 17:04:27 -08:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
shards.push(pools);
|
|
|
|
|
addresses.push(servers);
|
|
|
|
|
banlist.push(HashMap::new());
|
2022-03-01 22:49:43 -08:00
|
|
|
}
|
|
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
assert_eq!(shards.len(), addresses.len());
|
|
|
|
|
|
|
|
|
|
let mut pool = ConnectionPool {
|
|
|
|
|
databases: shards,
|
|
|
|
|
addresses: addresses,
|
|
|
|
|
banlist: Arc::new(RwLock::new(banlist)),
|
|
|
|
|
stats: get_reporter(),
|
|
|
|
|
server_info: BytesMut::new(),
|
|
|
|
|
settings: PoolSettings {
|
|
|
|
|
pool_mode: pool_config.pool_mode.clone(),
|
|
|
|
|
shards: pool_config.shards.clone(),
|
|
|
|
|
user: user_info.clone(),
|
|
|
|
|
default_role: pool_config.default_role.clone(),
|
|
|
|
|
query_parser_enabled: pool_config.query_parser_enabled.clone(),
|
|
|
|
|
primary_reads_enabled: pool_config.primary_reads_enabled,
|
|
|
|
|
sharding_function: pool_config.sharding_function.clone(),
|
|
|
|
|
},
|
|
|
|
|
};
|
2022-06-24 14:52:38 -07:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
// Connect to the servers to make sure pool configuration is valid
|
|
|
|
|
// before setting it globally.
|
|
|
|
|
match pool.validate().await {
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("Could not validate connection pool: {:?}", err);
|
|
|
|
|
return Err(err);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
new_pools.insert((pool_name.clone(), user_info.username.clone()), pool);
|
2022-06-24 14:52:38 -07:00
|
|
|
}
|
2022-07-27 21:47:55 -05:00
|
|
|
}
|
2022-06-24 14:52:38 -07:00
|
|
|
|
2022-07-27 21:47:55 -05:00
|
|
|
POOLS.store(Arc::new(new_pools.clone()));
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
Ok(())
|
2022-02-08 09:25:59 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-11 22:19:49 -08:00
|
|
|
/// Connect to all shards and grab server information.
|
|
|
|
|
/// Return server information we will pass to the clients
|
|
|
|
|
/// when they connect.
|
2022-02-12 09:24:24 -08:00
|
|
|
/// This also warms up the pool for clients that connect when
|
|
|
|
|
/// the pooler starts up.
|
2022-06-24 14:52:38 -07:00
|
|
|
async fn validate(&mut self) -> Result<(), Error> {
|
2022-02-11 22:19:49 -08:00
|
|
|
let mut server_infos = Vec::new();
|
2022-03-04 17:04:27 -08:00
|
|
|
let stats = self.stats.clone();
|
2022-06-24 14:52:38 -07:00
|
|
|
|
2022-02-11 22:19:49 -08:00
|
|
|
for shard in 0..self.shards() {
|
2022-06-24 14:52:38 -07:00
|
|
|
let mut round_robin = 0;
|
|
|
|
|
|
2022-02-18 07:10:18 -08:00
|
|
|
for _ in 0..self.servers(shard) {
|
2022-03-04 17:04:27 -08:00
|
|
|
// To keep stats consistent.
|
|
|
|
|
let fake_process_id = 0;
|
|
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
let connection = match self.get(shard, None, fake_process_id, round_robin).await {
|
2022-02-12 09:24:24 -08:00
|
|
|
Ok(conn) => conn,
|
|
|
|
|
Err(err) => {
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
2022-02-18 07:10:18 -08:00
|
|
|
continue;
|
2022-02-12 09:24:24 -08:00
|
|
|
}
|
|
|
|
|
};
|
2022-02-11 22:19:49 -08:00
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
let proxy = connection.0;
|
2022-02-26 11:01:52 -08:00
|
|
|
let address = connection.1;
|
2022-06-24 14:52:38 -07:00
|
|
|
let server = &*proxy;
|
2022-02-26 11:01:52 -08:00
|
|
|
let server_info = server.server_info();
|
|
|
|
|
|
2022-03-04 17:04:27 -08:00
|
|
|
stats.client_disconnecting(fake_process_id, address.id);
|
|
|
|
|
|
2022-02-26 11:01:52 -08:00
|
|
|
if server_infos.len() > 0 {
|
|
|
|
|
// Compare against the last server checked.
|
|
|
|
|
if server_info != server_infos[server_infos.len() - 1] {
|
|
|
|
|
warn!(
|
|
|
|
|
"{:?} has different server configuration than the last server",
|
|
|
|
|
address
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
server_infos.push(server_info);
|
2022-06-24 14:52:38 -07:00
|
|
|
round_robin += 1;
|
2022-02-12 09:24:24 -08:00
|
|
|
}
|
2022-02-11 22:19:49 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: compare server information to make sure
|
|
|
|
|
// all shards are running identical configurations.
|
|
|
|
|
if server_infos.len() == 0 {
|
|
|
|
|
return Err(Error::AllServersDown);
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
self.server_info = server_infos[0].clone();
|
|
|
|
|
|
|
|
|
|
Ok(())
|
2022-02-11 22:19:49 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-08 09:25:59 -08:00
|
|
|
/// Get a connection from the pool.
|
2022-02-05 18:20:53 -08:00
|
|
|
pub async fn get(
|
2022-08-11 17:42:40 -04:00
|
|
|
&self,
|
2022-06-24 14:52:38 -07:00
|
|
|
shard: usize, // shard number
|
|
|
|
|
role: Option<Role>, // primary or replica
|
|
|
|
|
process_id: i32, // client id
|
|
|
|
|
mut round_robin: usize, // round robin offset
|
2022-02-05 18:20:53 -08:00
|
|
|
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
2022-02-14 10:00:55 -08:00
|
|
|
let now = Instant::now();
|
2022-02-10 10:37:49 -08:00
|
|
|
let addresses = &self.addresses[shard];
|
|
|
|
|
|
|
|
|
|
let mut allowed_attempts = match role {
|
|
|
|
|
// Primary-specific queries get one attempt, if the primary is down,
|
|
|
|
|
// nothing we should do about it I think. It's dangerous to retry
|
|
|
|
|
// write queries.
|
|
|
|
|
Some(Role::Primary) => 1,
|
|
|
|
|
|
2022-02-10 08:54:06 -08:00
|
|
|
// Replicas get to try as many times as there are replicas
|
|
|
|
|
// and connections in the pool.
|
2022-02-18 07:10:18 -08:00
|
|
|
_ => addresses.len(),
|
|
|
|
|
};
|
|
|
|
|
|
2022-02-24 08:44:41 -08:00
|
|
|
debug!("Allowed attempts for {:?}: {}", role, allowed_attempts);
|
|
|
|
|
|
2022-02-18 07:10:18 -08:00
|
|
|
let exists = match role {
|
|
|
|
|
Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0,
|
|
|
|
|
None => true,
|
2022-02-09 21:19:14 -08:00
|
|
|
};
|
|
|
|
|
|
2022-02-18 07:10:18 -08:00
|
|
|
if !exists {
|
2022-02-21 00:00:50 -08:00
|
|
|
error!("Requested role {:?}, but none are configured", role);
|
2022-02-18 07:10:18 -08:00
|
|
|
return Err(Error::BadConfig);
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
|
|
|
|
let healthcheck_delay = get_config().general.healthcheck_delay as u128;
|
|
|
|
|
|
2022-02-09 21:19:14 -08:00
|
|
|
while allowed_attempts > 0 {
|
2022-03-10 01:33:29 -08:00
|
|
|
// Round-robin replicas.
|
2022-06-24 14:52:38 -07:00
|
|
|
round_robin += 1;
|
2022-03-10 01:33:29 -08:00
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
let index = round_robin % addresses.len();
|
2022-02-10 10:37:49 -08:00
|
|
|
let address = &addresses[index];
|
2022-02-05 19:43:48 -08:00
|
|
|
|
2022-02-09 20:02:20 -08:00
|
|
|
// Make sure you're getting a primary or a replica
|
2022-02-18 07:10:18 -08:00
|
|
|
// as per request. If no specific role is requested, the first
|
|
|
|
|
// available will be chosen.
|
|
|
|
|
if address.role != role {
|
2022-02-09 21:19:14 -08:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
allowed_attempts -= 1;
|
|
|
|
|
|
2022-06-24 14:52:38 -07:00
|
|
|
// Don't attempt to connect to banned servers.
|
2022-02-18 07:10:18 -08:00
|
|
|
if self.is_banned(address, shard, role) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-17 13:05:25 -07:00
|
|
|
// Indicate we're waiting on a server connection from a pool.
|
|
|
|
|
self.stats.client_waiting(process_id, address.id);
|
|
|
|
|
|
2022-02-05 19:43:48 -08:00
|
|
|
// Check if we can connect
|
|
|
|
|
let mut conn = match self.databases[shard][index].get().await {
|
|
|
|
|
Ok(conn) => conn,
|
|
|
|
|
Err(err) => {
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Banning replica {}, error: {:?}", index, err);
|
2022-08-11 17:42:40 -04:00
|
|
|
self.ban(address, shard, process_id);
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.client_disconnecting(process_id, address.id);
|
|
|
|
|
self.stats
|
2022-03-07 23:05:40 -08:00
|
|
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
2022-02-05 19:43:48 -08:00
|
|
|
continue;
|
2022-02-05 18:20:53 -08:00
|
|
|
}
|
2022-02-05 19:43:48 -08:00
|
|
|
};
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// // Check if this server is alive with a health check.
|
2022-02-05 19:43:48 -08:00
|
|
|
let server = &mut *conn;
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
// Will return error if timestamp is greater than current system time, which it should never be set to
|
|
|
|
|
let require_healthcheck =
|
|
|
|
|
server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay;
|
|
|
|
|
|
|
|
|
|
if !require_healthcheck {
|
|
|
|
|
self.stats
|
|
|
|
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
|
|
|
|
self.stats.server_idle(conn.process_id(), address.id);
|
|
|
|
|
return Ok((conn, address.clone()));
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-11 22:47:47 -07:00
|
|
|
debug!("Running health check on server {:?}", address);
|
|
|
|
|
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.server_tested(server.process_id(), address.id);
|
2022-02-20 22:47:08 -08:00
|
|
|
|
2022-02-05 19:43:48 -08:00
|
|
|
match tokio::time::timeout(
|
2022-02-19 13:57:35 -08:00
|
|
|
tokio::time::Duration::from_millis(healthcheck_timeout),
|
2022-07-29 13:58:25 -05:00
|
|
|
server.query(";"),
|
2022-02-05 19:43:48 -08:00
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
2022-03-10 01:33:29 -08:00
|
|
|
// Check if health check succeeded.
|
2022-02-08 15:48:28 -08:00
|
|
|
Ok(res) => match res {
|
2022-02-14 10:00:55 -08:00
|
|
|
Ok(_) => {
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats
|
2022-03-07 23:05:40 -08:00
|
|
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.server_idle(conn.process_id(), address.id);
|
2022-02-14 10:00:55 -08:00
|
|
|
return Ok((conn, address.clone()));
|
|
|
|
|
}
|
2022-03-10 01:33:29 -08:00
|
|
|
|
|
|
|
|
// Health check failed.
|
2022-02-08 15:48:28 -08:00
|
|
|
Err(_) => {
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Banning replica {} because of failed health check", index);
|
2022-03-10 01:33:29 -08:00
|
|
|
|
2022-02-10 08:54:06 -08:00
|
|
|
// Don't leave a bad connection in the pool.
|
|
|
|
|
server.mark_bad();
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
self.ban(address, shard, process_id);
|
2022-02-08 15:48:28 -08:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
},
|
2022-03-10 01:33:29 -08:00
|
|
|
|
|
|
|
|
// Health check timed out.
|
2022-02-05 19:43:48 -08:00
|
|
|
Err(_) => {
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Banning replica {} because of health check timeout", index);
|
2022-02-10 08:54:06 -08:00
|
|
|
// Don't leave a bad connection in the pool.
|
|
|
|
|
server.mark_bad();
|
|
|
|
|
|
2022-08-11 17:42:40 -04:00
|
|
|
self.ban(address, shard, process_id);
|
2022-02-05 19:43:48 -08:00
|
|
|
continue;
|
2022-02-05 18:20:53 -08:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
2022-02-09 21:19:14 -08:00
|
|
|
|
|
|
|
|
return Err(Error::AllServersDown);
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-05 13:25:03 -08:00
|
|
|
/// Ban an address (i.e. replica). It no longer will serve
|
|
|
|
|
/// traffic for any new transactions. Existing transactions on that replica
|
|
|
|
|
/// will finish successfully or error out to the clients.
|
2022-08-11 17:42:40 -04:00
|
|
|
pub fn ban(&self, address: &Address, shard: usize, process_id: i32) {
|
|
|
|
|
self.stats.client_disconnecting(process_id, address.id);
|
|
|
|
|
self.stats
|
|
|
|
|
.checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id);
|
|
|
|
|
|
2022-02-20 22:47:08 -08:00
|
|
|
error!("Banning {:?}", address);
|
2022-02-05 10:02:13 -08:00
|
|
|
let now = chrono::offset::Utc::now().naive_utc();
|
2022-02-24 08:44:41 -08:00
|
|
|
let mut guard = self.banlist.write();
|
2022-02-06 11:13:12 -08:00
|
|
|
guard[shard].insert(address.clone(), now);
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-05 13:25:03 -08:00
|
|
|
/// Clear the replica to receive traffic again. Takes effect immediately
|
|
|
|
|
/// for all new transactions.
|
2022-02-08 17:08:17 -08:00
|
|
|
pub fn _unban(&self, address: &Address, shard: usize) {
|
2022-02-24 08:44:41 -08:00
|
|
|
let mut guard = self.banlist.write();
|
2022-02-06 11:13:12 -08:00
|
|
|
guard[shard].remove(address);
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
|
|
|
|
|
2022-02-05 13:25:03 -08:00
|
|
|
/// Check if a replica can serve traffic. If all replicas are banned,
|
|
|
|
|
/// we unban all of them. Better to try then not to.
|
2022-02-09 21:19:14 -08:00
|
|
|
pub fn is_banned(&self, address: &Address, shard: usize, role: Option<Role>) -> bool {
|
2022-02-18 07:10:18 -08:00
|
|
|
let replicas_available = match role {
|
|
|
|
|
Some(Role::Replica) => self.addresses[shard]
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|addr| addr.role == Role::Replica)
|
|
|
|
|
.count(),
|
|
|
|
|
None => self.addresses[shard].len(),
|
|
|
|
|
Some(Role::Primary) => return false, // Primary cannot be banned.
|
|
|
|
|
};
|
2022-02-09 21:19:14 -08:00
|
|
|
|
2022-02-24 08:44:41 -08:00
|
|
|
debug!("Available targets for {:?}: {}", role, replicas_available);
|
|
|
|
|
|
|
|
|
|
let guard = self.banlist.read();
|
2022-02-05 10:02:13 -08:00
|
|
|
|
2022-02-09 06:51:31 -08:00
|
|
|
// Everything is banned = nothing is banned.
|
2022-02-18 07:10:18 -08:00
|
|
|
if guard[shard].len() == replicas_available {
|
2022-02-24 08:44:41 -08:00
|
|
|
drop(guard);
|
|
|
|
|
let mut guard = self.banlist.write();
|
2022-02-06 12:52:59 -08:00
|
|
|
guard[shard].clear();
|
2022-02-05 13:15:53 -08:00
|
|
|
drop(guard);
|
2022-02-20 22:47:08 -08:00
|
|
|
warn!("Unbanning all replicas.");
|
2022-02-05 10:02:13 -08:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// I expect this to miss 99.9999% of the time.
|
2022-02-06 11:13:12 -08:00
|
|
|
match guard[shard].get(address) {
|
2022-02-05 10:02:13 -08:00
|
|
|
Some(timestamp) => {
|
|
|
|
|
let now = chrono::offset::Utc::now().naive_utc();
|
2022-02-19 13:57:35 -08:00
|
|
|
let config = get_config();
|
2022-02-24 08:44:41 -08:00
|
|
|
|
2022-02-09 06:51:31 -08:00
|
|
|
// Ban expired.
|
2022-02-19 13:57:35 -08:00
|
|
|
if now.timestamp() - timestamp.timestamp() > config.general.ban_time {
|
2022-02-24 08:44:41 -08:00
|
|
|
drop(guard);
|
|
|
|
|
warn!("Unbanning {:?}", address);
|
|
|
|
|
let mut guard = self.banlist.write();
|
2022-02-06 11:13:12 -08:00
|
|
|
guard[shard].remove(address);
|
2022-02-05 10:02:13 -08:00
|
|
|
false
|
|
|
|
|
} else {
|
2022-02-24 08:44:41 -08:00
|
|
|
debug!("{:?} is banned", address);
|
2022-02-05 10:02:13 -08:00
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-24 08:44:41 -08:00
|
|
|
None => {
|
|
|
|
|
debug!("{:?} is ok", address);
|
|
|
|
|
false
|
|
|
|
|
}
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
|
|
|
|
}
|
2022-02-05 19:43:48 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Get the number of configured shards.
|
2022-02-05 19:43:48 -08:00
|
|
|
pub fn shards(&self) -> usize {
|
|
|
|
|
self.databases.len()
|
|
|
|
|
}
|
2022-02-12 09:24:24 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Get the number of servers (primary and replicas)
|
|
|
|
|
/// configured for a shard.
|
2022-02-18 07:10:18 -08:00
|
|
|
pub fn servers(&self, shard: usize) -> usize {
|
2022-02-12 09:24:24 -08:00
|
|
|
self.addresses[shard].len()
|
|
|
|
|
}
|
2022-02-28 17:22:28 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Get the total number of servers (databases) we are connected to.
|
2022-03-01 22:49:43 -08:00
|
|
|
pub fn databases(&self) -> usize {
|
|
|
|
|
let mut databases = 0;
|
|
|
|
|
for shard in 0..self.shards() {
|
|
|
|
|
databases += self.servers(shard);
|
|
|
|
|
}
|
|
|
|
|
databases
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Get pool state for a particular shard server as reported by bb8.
|
2022-03-01 08:47:19 -08:00
|
|
|
pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State {
|
2022-02-28 17:22:28 -08:00
|
|
|
self.databases[shard][server].state()
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Get the address information for a shard server.
|
2022-02-28 17:22:28 -08:00
|
|
|
pub fn address(&self, shard: usize, server: usize) -> &Address {
|
|
|
|
|
&self.addresses[shard][server]
|
|
|
|
|
}
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
pub fn server_info(&self) -> BytesMut {
|
|
|
|
|
self.server_info.clone()
|
|
|
|
|
}
|
2022-02-05 18:20:53 -08:00
|
|
|
}
|
2022-02-05 10:02:13 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
/// Wrapper for the bb8 connection pool.
|
2022-02-05 18:20:53 -08:00
|
|
|
pub struct ServerPool {
|
|
|
|
|
address: Address,
|
|
|
|
|
user: User,
|
|
|
|
|
database: String,
|
|
|
|
|
client_server_map: ClientServerMap,
|
2022-02-14 10:00:55 -08:00
|
|
|
stats: Reporter,
|
2022-02-05 18:20:53 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ServerPool {
|
|
|
|
|
pub fn new(
|
|
|
|
|
address: Address,
|
|
|
|
|
user: User,
|
|
|
|
|
database: &str,
|
|
|
|
|
client_server_map: ClientServerMap,
|
2022-02-14 10:00:55 -08:00
|
|
|
stats: Reporter,
|
2022-02-05 18:20:53 -08:00
|
|
|
) -> ServerPool {
|
|
|
|
|
ServerPool {
|
|
|
|
|
address: address,
|
|
|
|
|
user: user,
|
|
|
|
|
database: database.to_string(),
|
|
|
|
|
client_server_map: client_server_map,
|
2022-02-14 10:00:55 -08:00
|
|
|
stats: stats,
|
2022-02-05 10:02:13 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-05 18:20:53 -08:00
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl ManageConnection for ServerPool {
|
|
|
|
|
type Connection = Server;
|
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
|
|
/// Attempts to create a new connection.
|
|
|
|
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
2022-02-20 22:47:08 -08:00
|
|
|
info!(
|
|
|
|
|
"Creating a new connection to {:?} using user {:?}",
|
2022-03-04 17:04:27 -08:00
|
|
|
self.address.name(),
|
2022-07-27 21:47:55 -05:00
|
|
|
self.user.username
|
2022-02-20 22:47:08 -08:00
|
|
|
);
|
2022-02-05 18:20:53 -08:00
|
|
|
|
2022-02-21 17:28:50 -08:00
|
|
|
// Put a temporary process_id into the stats
|
|
|
|
|
// for server login.
|
|
|
|
|
let process_id = rand::random::<i32>();
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.server_login(process_id, self.address.id);
|
2022-02-21 17:28:50 -08:00
|
|
|
|
2022-03-10 01:33:29 -08:00
|
|
|
// Connect to the PostgreSQL server.
|
2022-02-21 17:28:50 -08:00
|
|
|
match Server::startup(
|
2022-02-15 08:18:01 -08:00
|
|
|
&self.address,
|
|
|
|
|
&self.user,
|
2022-02-05 18:20:53 -08:00
|
|
|
&self.database,
|
|
|
|
|
self.client_server_map.clone(),
|
2022-02-14 10:00:55 -08:00
|
|
|
self.stats.clone(),
|
2022-02-05 18:20:53 -08:00
|
|
|
)
|
|
|
|
|
.await
|
2022-02-21 17:28:50 -08:00
|
|
|
{
|
|
|
|
|
Ok(conn) => {
|
|
|
|
|
// Remove the temporary process_id from the stats.
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.server_disconnecting(process_id, self.address.id);
|
2022-02-21 17:28:50 -08:00
|
|
|
Ok(conn)
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
// Remove the temporary process_id from the stats.
|
2022-03-04 17:04:27 -08:00
|
|
|
self.stats.server_disconnecting(process_id, self.address.id);
|
2022-02-21 17:28:50 -08:00
|
|
|
Err(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-02-05 18:20:53 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Determines if the connection is still connected to the database.
|
|
|
|
|
async fn is_valid(&self, _conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Error> {
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Synchronously determine if the connection is no longer usable, if possible.
|
|
|
|
|
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
|
|
|
|
|
conn.is_bad()
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-06-24 14:52:38 -07:00
|
|
|
|
|
|
|
|
/// Get the connection pool
|
2022-07-27 21:47:55 -05:00
|
|
|
pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> {
|
|
|
|
|
match get_all_pools().get(&(db, user)) {
|
|
|
|
|
Some(pool) => Some(pool.clone()),
|
|
|
|
|
None => None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_number_of_addresses() -> usize {
|
|
|
|
|
get_all_pools()
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|(_, pool)| pool.databases())
|
|
|
|
|
.sum()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
|
|
|
|
|
return (*(*POOLS.load())).clone();
|
2022-06-24 14:52:38 -07:00
|
|
|
}
|