Compare commits

..

1 Commits

Author SHA1 Message Date
Lev Kokotov
28172cc1d5 Fix debug log 2022-08-11 22:47:22 -07:00
13 changed files with 94 additions and 255 deletions

View File

@@ -11,12 +11,9 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# Whether to enable prometheus exporter or not.
# enable prometheus exporter on port 9930
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 100
@@ -91,13 +88,11 @@ password = "sharding_user"
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 9
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000
# Shard 0
[pools.sharded_db.shards.0]
@@ -135,7 +130,6 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0]
servers = [

View File

@@ -66,18 +66,6 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
# Replica/primary selection & more sharding tests
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
# Statement timeout tests
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config
sleep 0.2
# This should timeout
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')
# Disable statement timeout
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config again
#
# ActiveRecord tests
#

View File

@@ -38,34 +38,30 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
### Config
| **Name** | **Description** | **Examples** |
|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
| **`general`** | | |
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
| `port` | The pooler will run on this port. | `6432` |
| `enable_prometheus_exporter` | Enable prometheus exporter which will export metrics in prometheus exposition format. | `true` |
| `prometheus_exporter_port` | Port at which prometheus exporter listens on. | `9930` |
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| `autoreload` | Enable auto-reload of config after fixed time-interval. | `false` |
| | | |
| **`user`** | | |
| `name` | The user name. | `sharding_user` |
| `password` | The user password in plaintext. | `hunter2` |
| | | |
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
| | | |
| **`query_router`** | | |
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
| **Name** | **Description** | **Examples** |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
| **`general`** | | |
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
| `port` | The pooler will run on this port. | `6432` |
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| | | |
| **`user`** | | |
| `name` | The user name. | `sharding_user` |
| `password` | The user password in plaintext. | `hunter2` |
| | | |
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
| **`query_router`** | | |
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
## Local development

View File

@@ -1,7 +1,7 @@
version: "3"
services:
postgres:
image: postgres:14
image: postgres:13
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: md5

View File

@@ -11,12 +11,9 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# Whether to enable prometheus exporter or not.
# enable prometheus exporter on port 9930
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 5000
@@ -92,14 +89,10 @@ password = "postgres"
# is the sum of pool_size across all users.
pool_size = 9
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
statement_timeout = 0
[pools.sharded.users.1]
username = "postgres"
password = "postgres"
pool_size = 21
statement_timeout = 15000
# Shard 0
[pools.sharded.shards.0]
@@ -137,7 +130,6 @@ sharding_function = "pg_bigint_hash"
username = "postgres"
password = "postgres"
pool_size = 5
statement_timeout = 0
[pools.simple_db.shards.0]
servers = [

View File

@@ -11,12 +11,9 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = 6432
# Whether to enable prometheus exporter or not.
# enable prometheus exporter on port 9930
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 5000
@@ -92,14 +89,10 @@ password = "sharding_user"
# is the sum of pool_size across all users.
pool_size = 9
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000
# Shard 0
[pools.sharded_db.shards.0]
@@ -137,7 +130,6 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 0
[pools.simple_db.shards.0]
servers = [

View File

@@ -44,45 +44,32 @@ where
trace!("Admin query: {}", query);
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
match query_parts[0] {
"RELOAD" => {
trace!("RELOAD");
reload(stream, client_server_map).await
}
"SET" => {
trace!("SET");
ignore_set(stream).await
}
"SHOW" => match query_parts[1] {
"CONFIG" => {
trace!("SHOW CONFIG");
show_config(stream).await
}
"DATABASES" => {
trace!("SHOW DATABASES");
show_databases(stream).await
}
"LISTS" => {
trace!("SHOW LISTS");
show_lists(stream).await
}
"POOLS" => {
trace!("SHOW POOLS");
show_pools(stream).await
}
"STATS" => {
trace!("SHOW STATS");
show_stats(stream).await
}
"VERSION" => {
trace!("SHOW VERSION");
show_version(stream).await
}
_ => error_response(stream, "Unsupported SHOW query against the admin database").await,
},
_ => error_response(stream, "Unsupported query against the admin database").await,
if query.starts_with("SHOW STATS") {
trace!("SHOW STATS");
show_stats(stream).await
} else if query.starts_with("RELOAD") {
trace!("RELOAD");
reload(stream, client_server_map).await
} else if query.starts_with("SHOW CONFIG") {
trace!("SHOW CONFIG");
show_config(stream).await
} else if query.starts_with("SHOW DATABASES") {
trace!("SHOW DATABASES");
show_databases(stream).await
} else if query.starts_with("SHOW POOLS") {
trace!("SHOW POOLS");
show_pools(stream).await
} else if query.starts_with("SHOW LISTS") {
trace!("SHOW LISTS");
show_lists(stream).await
} else if query.starts_with("SHOW VERSION") {
trace!("SHOW VERSION");
show_version(stream).await
} else if query.starts_with("SET ") {
trace!("SET");
ignore_set(stream).await
} else {
error_response(stream, "Unsupported query against the admin database").await
}
}
@@ -187,7 +174,6 @@ where
let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),

View File

@@ -499,7 +499,7 @@ where
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut round_robin = rand::random();
let mut round_robin = 0;
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
@@ -970,54 +970,17 @@ where
}
async fn receive_server_message(
&mut self,
&self,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<BytesMut, Error> {
if pool.settings.user.statement_timeout > 0 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
server.recv(),
)
.await
{
Ok(result) => match result {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
},
Err(_) => {
error!(
"Statement timeout while talking to {:?} with user {}",
address, pool.settings.user.username
);
server.mark_bad();
pool.ban(address, shard, self.process_id);
error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout)
}
}
} else {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
}
}
}

View File

@@ -100,7 +100,6 @@ pub struct User {
pub username: String,
pub password: String,
pub pool_size: u32,
pub statement_timeout: u64,
}
impl Default for User {
@@ -109,7 +108,6 @@ impl Default for User {
username: String::from("postgres"),
password: String::new(),
pool_size: 15,
statement_timeout: 0,
}
}
}
@@ -120,7 +118,6 @@ pub struct General {
pub host: String,
pub port: i16,
pub enable_prometheus_exporter: Option<bool>,
pub prometheus_exporter_port: i16,
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub shutdown_timeout: u64,
@@ -139,7 +136,6 @@ impl Default for General {
host: String::from("localhost"),
port: 5432,
enable_prometheus_exporter: Some(false),
prometheus_exporter_port: 9930,
connect_timeout: 5000,
healthcheck_timeout: 1000,
shutdown_timeout: 60000,
@@ -275,10 +271,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
let mut static_settings = vec![
("host".to_string(), config.general.host.to_string()),
("port".to_string(), config.general.port.to_string()),
(
"prometheus_exporter_port".to_string(),
config.general.prometheus_exporter_port.to_string(),
),
(
"connect_timeout".to_string(),
config.general.connect_timeout.to_string(),
@@ -334,7 +326,6 @@ impl Config {
};
for (pool_name, pool_config) in &self.pools {
// TODO: Make this output prettier (maybe a table?)
info!("--- Settings for pool {} ---", pool_name);
info!(
"Pool size from all users: {}",
@@ -349,17 +340,8 @@ impl Config {
info!("Sharding function: {}", pool_config.sharding_function);
info!("Primary reads: {}", pool_config.primary_reads_enabled);
info!("Query router: {}", pool_config.query_parser_enabled);
// TODO: Make this prettier.
info!("Number of shards: {}", pool_config.shards.len());
info!("Number of users: {}", pool_config.users.len());
for user in &pool_config.users {
info!(
"{} pool size: {}, statement timeout: {}",
user.1.username, user.1.pool_size, user.1.statement_timeout
);
}
}
}
}

View File

@@ -11,5 +11,4 @@ pub enum Error {
AllServersDown,
ClientError,
TlsError,
StatementTimeout,
}

View File

@@ -99,10 +99,7 @@ async fn main() {
let config = get_config();
if let Some(true) = config.general.enable_prometheus_exporter {
let http_addr_str = format!(
"{}:{}",
config.general.host, config.general.prometheus_exporter_port
);
let http_addr_str = format!("{}:{}", config.general.host, crate::prometheus::HTTP_PORT);
let http_addr = match SocketAddr::from_str(&http_addr_str) {
Ok(addr) => addr,
Err(err) => {
@@ -133,7 +130,7 @@ async fn main() {
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Statistics reporting.
let (tx, rx) = mpsc::channel(100_000);
let (tx, rx) = mpsc::channel(100);
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
// Connection pool that allows to query all shards and replicas.

View File

@@ -10,6 +10,8 @@ use crate::config::Address;
use crate::pool::get_all_pools;
use crate::stats::get_stats;
pub const HTTP_PORT: usize = 9930;
struct MetricHelpType {
help: &'static str,
ty: &'static str,

View File

@@ -1,11 +1,9 @@
use arc_swap::ArcSwap;
/// Statistics and reporting.
use log::{error, info, trace};
use log::info;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::pool::get_number_of_addresses;
@@ -43,29 +41,9 @@ enum EventName {
UpdateAverages,
}
/// Send an event via the channel and log
/// an error if it fails.
fn send(tx: &Sender<Event>, event: Event) {
let name = event.name;
let result = tx.try_send(event);
match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
name,
tx.capacity()
),
Err(err) => match err {
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
},
};
}
/// Event data sent to the collector
/// from clients and servers.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Event {
/// The name of the event being reported.
name: EventName,
@@ -111,7 +89,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event);
let _ = self.tx.try_send(event);
}
/// Report a transaction executed by a client against
@@ -124,7 +102,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Report data sent to a server identified by `address_id`.
@@ -137,7 +115,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Report data received from a server identified by `address_id`.
@@ -150,7 +128,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Time spent waiting to get a healthy connection from the pool
@@ -164,7 +142,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a client identified by `process_id` waiting for a connection
@@ -177,7 +155,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a client identified by `process_id` is done waiting for a connection
@@ -190,7 +168,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a client identified by `process_id` is done querying the server
@@ -203,7 +181,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a client identified by `process_id` is disconecting from the pooler.
@@ -216,7 +194,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a server connection identified by `process_id` for
@@ -230,7 +208,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a server connection identified by `process_id` for
@@ -244,7 +222,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a server connection identified by `process_id` for
@@ -258,7 +236,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a server connection identified by `process_id` for
@@ -272,7 +250,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
@@ -285,7 +263,7 @@ impl Reporter {
address_id: address_id,
};
send(&self.tx, event)
let _ = self.tx.try_send(event);
}
}
@@ -347,9 +325,6 @@ impl Collector {
// Track which state the client and server are at any given time.
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
// Average update times
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
// Flush stats to StatsD and calculate averages every 15 seconds.
let tx = self.tx.clone();
tokio::task::spawn(async move {
@@ -359,15 +334,12 @@ impl Collector {
interval.tick().await;
let address_count = get_number_of_addresses();
for address_id in 0..address_count {
send(
&tx,
Event {
name: EventName::UpdateStats,
value: 0,
process_id: -1,
address_id: address_id,
},
);
let _ = tx.try_send(Event {
name: EventName::UpdateStats,
value: 0,
process_id: -1,
address_id: address_id,
});
}
}
});
@@ -380,15 +352,12 @@ impl Collector {
interval.tick().await;
let address_count = get_number_of_addresses();
for address_id in 0..address_count {
send(
&tx,
Event {
name: EventName::UpdateAverages,
value: 0,
process_id: -1,
address_id: address_id,
},
);
let _ = tx.try_send(Event {
name: EventName::UpdateAverages,
value: 0,
process_id: -1,
address_id: address_id,
});
}
}
});
@@ -410,9 +379,6 @@ impl Collector {
.entry(stat.address_id)
.or_insert(HashMap::new());
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
let last_updated_avg = last_updated_avg
.entry(stat.address_id)
.or_insert(SystemTime::now());
// Some are counters, some are gauges...
match stat.name {
@@ -538,24 +504,6 @@ impl Collector {
}
EventName::UpdateAverages => {
let elapsed = match last_updated_avg.elapsed() {
Ok(elapsed) => elapsed.as_secs(),
Err(err) => {
error!(
"Could not get elapsed time, averages may be incorrect: {:?}",
err
);
STAT_PERIOD / 1_000
}
} as i64;
*last_updated_avg = SystemTime::now();
// Tokio triggers the interval on first tick and then sleeps.
if elapsed == 0 {
continue;
}
// Calculate averages
for stat in &[
"avg_query_count",
@@ -573,7 +521,7 @@ impl Collector {
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
let avg = (new_value - *old_value) / elapsed; // Avg / second
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
stats.insert(stat, avg);
*old_value = new_value;