mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
Add defaults for configs (#174)
* add statement timeout to readme * Add defaults to various configs * primary read enabled default to false
This commit is contained in:
@@ -60,6 +60,8 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
|
|||||||
| **`user`** | | |
|
| **`user`** | | |
|
||||||
| `name` | The user name. | `sharding_user` |
|
| `name` | The user name. | `sharding_user` |
|
||||||
| `password` | The user password in plaintext. | `hunter2` |
|
| `password` | The user password in plaintext. | `hunter2` |
|
||||||
|
| `statement_timeout` | Timeout in milliseconds for how long a query takes to execute | `0 (disabled)` |
|
||||||
|
|
||||||
| | | |
|
| | | |
|
||||||
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
||||||
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ use tokio::sync::broadcast::Receiver;
|
|||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::admin::{generate_server_info_for_admin, handle_admin};
|
use crate::admin::{generate_server_info_for_admin, handle_admin};
|
||||||
use crate::config::{get_config, Address};
|
use crate::config::{get_config, Address, PoolMode};
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::pool::{get_pool, ClientServerMap, ConnectionPool, PoolMode};
|
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
||||||
use crate::query_router::{Command, QueryRouter};
|
use crate::query_router::{Command, QueryRouter};
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
use crate::stats::{get_reporter, Reporter};
|
use crate::stats::{get_reporter, Reporter};
|
||||||
|
|||||||
139
src/config.rs
139
src/config.rs
@@ -127,6 +127,7 @@ pub struct User {
|
|||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: String,
|
pub password: String,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
|
#[serde(default)] // 0
|
||||||
pub statement_timeout: u64,
|
pub statement_timeout: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -144,34 +145,81 @@ impl Default for User {
|
|||||||
/// General configuration.
|
/// General configuration.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct General {
|
pub struct General {
|
||||||
|
#[serde(default = "General::default_host")]
|
||||||
pub host: String,
|
pub host: String,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_port")]
|
||||||
pub port: i16,
|
pub port: i16,
|
||||||
|
|
||||||
pub enable_prometheus_exporter: Option<bool>,
|
pub enable_prometheus_exporter: Option<bool>,
|
||||||
pub prometheus_exporter_port: i16,
|
pub prometheus_exporter_port: i16,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_connect_timeout")]
|
||||||
pub connect_timeout: u64,
|
pub connect_timeout: u64,
|
||||||
pub healthcheck_timeout: u64,
|
|
||||||
|
#[serde(default = "General::default_shutdown_timeout")]
|
||||||
pub shutdown_timeout: u64,
|
pub shutdown_timeout: u64,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_healthcheck_timeout")]
|
||||||
|
pub healthcheck_timeout: u64,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_healthcheck_delay")]
|
||||||
pub healthcheck_delay: u64,
|
pub healthcheck_delay: u64,
|
||||||
|
|
||||||
|
#[serde(default = "General::default_ban_time")]
|
||||||
pub ban_time: i64,
|
pub ban_time: i64,
|
||||||
|
|
||||||
|
#[serde(default)] // False
|
||||||
pub autoreload: bool,
|
pub autoreload: bool,
|
||||||
|
|
||||||
pub tls_certificate: Option<String>,
|
pub tls_certificate: Option<String>,
|
||||||
pub tls_private_key: Option<String>,
|
pub tls_private_key: Option<String>,
|
||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl General {
|
||||||
|
fn default_host() -> String {
|
||||||
|
"0.0.0.0".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_port() -> i16 {
|
||||||
|
5432
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_connect_timeout() -> u64 {
|
||||||
|
1000
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_shutdown_timeout() -> u64 {
|
||||||
|
60000
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_healthcheck_timeout() -> u64 {
|
||||||
|
1000
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_healthcheck_delay() -> u64 {
|
||||||
|
30000
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_ban_time() -> i64 {
|
||||||
|
60
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for General {
|
impl Default for General {
|
||||||
fn default() -> General {
|
fn default() -> General {
|
||||||
General {
|
General {
|
||||||
host: String::from("localhost"),
|
host: General::default_host(),
|
||||||
port: 5432,
|
port: General::default_port(),
|
||||||
enable_prometheus_exporter: Some(false),
|
enable_prometheus_exporter: Some(false),
|
||||||
prometheus_exporter_port: 9930,
|
prometheus_exporter_port: 9930,
|
||||||
connect_timeout: 5000,
|
connect_timeout: General::default_connect_timeout(),
|
||||||
healthcheck_timeout: 1000,
|
shutdown_timeout: General::default_shutdown_timeout(),
|
||||||
shutdown_timeout: 60000,
|
healthcheck_timeout: General::default_healthcheck_timeout(),
|
||||||
healthcheck_delay: 30000,
|
healthcheck_delay: General::default_healthcheck_delay(),
|
||||||
ban_time: 60,
|
ban_time: General::default_ban_time(),
|
||||||
autoreload: false,
|
autoreload: false,
|
||||||
tls_certificate: None,
|
tls_certificate: None,
|
||||||
tls_private_key: None,
|
tls_private_key: None,
|
||||||
@@ -180,25 +228,61 @@ impl Default for General {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pool mode:
|
||||||
|
/// - transaction: server serves one transaction,
|
||||||
|
/// - session: server is attached to the client.
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Copy)]
|
||||||
|
pub enum PoolMode {
|
||||||
|
#[serde(alias = "transaction", alias = "Transaction")]
|
||||||
|
Transaction,
|
||||||
|
|
||||||
|
#[serde(alias = "session", alias = "Session")]
|
||||||
|
Session,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for PoolMode {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
match *self {
|
||||||
|
PoolMode::Transaction => "transaction".to_string(),
|
||||||
|
PoolMode::Session => "session".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Pool {
|
pub struct Pool {
|
||||||
pub pool_mode: String,
|
#[serde(default = "Pool::default_pool_mode")]
|
||||||
|
pub pool_mode: PoolMode,
|
||||||
|
|
||||||
pub default_role: String,
|
pub default_role: String,
|
||||||
|
|
||||||
|
#[serde(default)] // False
|
||||||
pub query_parser_enabled: bool,
|
pub query_parser_enabled: bool,
|
||||||
|
|
||||||
|
#[serde(default)] // False
|
||||||
pub primary_reads_enabled: bool,
|
pub primary_reads_enabled: bool,
|
||||||
|
|
||||||
pub sharding_function: String,
|
pub sharding_function: String,
|
||||||
pub shards: HashMap<String, Shard>,
|
pub shards: HashMap<String, Shard>,
|
||||||
pub users: HashMap<String, User>,
|
pub users: HashMap<String, User>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Pool {
|
||||||
|
fn default_pool_mode() -> PoolMode {
|
||||||
|
PoolMode::Transaction
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Pool {
|
impl Default for Pool {
|
||||||
fn default() -> Pool {
|
fn default() -> Pool {
|
||||||
Pool {
|
Pool {
|
||||||
pool_mode: String::from("transaction"),
|
pool_mode: Pool::default_pool_mode(),
|
||||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||||
users: HashMap::default(),
|
users: HashMap::default(),
|
||||||
default_role: String::from("any"),
|
default_role: String::from("any"),
|
||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
primary_reads_enabled: true,
|
primary_reads_enabled: false,
|
||||||
sharding_function: "pg_bigint_hash".to_string(),
|
sharding_function: "pg_bigint_hash".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -231,10 +315,6 @@ impl Default for Shard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_path() -> String {
|
|
||||||
String::from("pgcat.toml")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Configuration wrapper.
|
/// Configuration wrapper.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -249,17 +329,23 @@ pub struct Config {
|
|||||||
// [main.subconf]
|
// [main.subconf]
|
||||||
// field1_under_subconf = 1
|
// field1_under_subconf = 1
|
||||||
// field3_under_main = 3 # This field will be interpreted as being under subconf and not under main
|
// field3_under_main = 3 # This field will be interpreted as being under subconf and not under main
|
||||||
#[serde(default = "default_path")]
|
#[serde(default = "Config::default_path")]
|
||||||
pub path: String,
|
pub path: String,
|
||||||
|
|
||||||
pub general: General,
|
pub general: General,
|
||||||
pub pools: HashMap<String, Pool>,
|
pub pools: HashMap<String, Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
fn default_path() -> String {
|
||||||
|
String::from("pgcat.toml")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Config {
|
fn default() -> Config {
|
||||||
Config {
|
Config {
|
||||||
path: String::from("pgcat.toml"),
|
path: Config::default_path(),
|
||||||
general: General::default(),
|
general: General::default(),
|
||||||
pools: HashMap::default(),
|
pools: HashMap::default(),
|
||||||
}
|
}
|
||||||
@@ -275,7 +361,7 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
|||||||
[
|
[
|
||||||
(
|
(
|
||||||
format!("pools.{}.pool_mode", pool_name),
|
format!("pools.{}.pool_mode", pool_name),
|
||||||
pool.pool_mode.clone(),
|
pool.pool_mode.to_string(),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
format!("pools.{}.primary_reads_enabled", pool_name),
|
format!("pools.{}.primary_reads_enabled", pool_name),
|
||||||
@@ -383,7 +469,10 @@ impl Config {
|
|||||||
.sum::<u32>()
|
.sum::<u32>()
|
||||||
.to_string()
|
.to_string()
|
||||||
);
|
);
|
||||||
info!("[pool: {}] Pool mode: {}", pool_name, pool_config.pool_mode);
|
info!(
|
||||||
|
"[pool: {}] Pool mode: {:?}",
|
||||||
|
pool_name, pool_config.pool_mode
|
||||||
|
);
|
||||||
info!(
|
info!(
|
||||||
"[pool: {}] Sharding function: {}",
|
"[pool: {}] Sharding function: {}",
|
||||||
pool_name, pool_config.sharding_function
|
pool_name, pool_config.sharding_function
|
||||||
@@ -513,18 +602,6 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match pool.pool_mode.as_ref() {
|
|
||||||
"transaction" => (),
|
|
||||||
"session" => (),
|
|
||||||
other => {
|
|
||||||
error!(
|
|
||||||
"pool_mode can be 'session' or 'transaction', got: '{}'",
|
|
||||||
other
|
|
||||||
);
|
|
||||||
return Err(Error::BadConfig);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for shard in &pool.shards {
|
for shard in &pool.shards {
|
||||||
// We use addresses as unique identifiers,
|
// We use addresses as unique identifiers,
|
||||||
// let's make sure they are unique in the config as well.
|
// let's make sure they are unique in the config as well.
|
||||||
|
|||||||
26
src/pool.rs
26
src/pool.rs
@@ -12,7 +12,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use crate::config::{get_config, Address, Role, User};
|
use crate::config::{get_config, Address, PoolMode, Role, User};
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
|
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -27,24 +27,6 @@ pub type PoolMap = HashMap<(String, String), ConnectionPool>;
|
|||||||
/// The pool is recreated dynamically when the config is reloaded.
|
/// The pool is recreated dynamically when the config is reloaded.
|
||||||
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
|
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
|
||||||
|
|
||||||
/// Pool mode:
|
|
||||||
/// - transaction: server serves one transaction,
|
|
||||||
/// - session: server is attached to the client.
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
|
||||||
pub enum PoolMode {
|
|
||||||
Session,
|
|
||||||
Transaction,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
match *self {
|
|
||||||
PoolMode::Session => write!(f, "session"),
|
|
||||||
PoolMode::Transaction => write!(f, "transaction"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Pool settings.
|
/// Pool settings.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PoolSettings {
|
pub struct PoolSettings {
|
||||||
@@ -199,11 +181,7 @@ impl ConnectionPool {
|
|||||||
stats: get_reporter(),
|
stats: get_reporter(),
|
||||||
server_info: BytesMut::new(),
|
server_info: BytesMut::new(),
|
||||||
settings: PoolSettings {
|
settings: PoolSettings {
|
||||||
pool_mode: match pool_config.pool_mode.as_str() {
|
pool_mode: pool_config.pool_mode,
|
||||||
"transaction" => PoolMode::Transaction,
|
|
||||||
"session" => PoolMode::Session,
|
|
||||||
_ => unreachable!(),
|
|
||||||
},
|
|
||||||
// shards: pool_config.shards.clone(),
|
// shards: pool_config.shards.clone(),
|
||||||
shards: shard_ids.len(),
|
shards: shard_ids.len(),
|
||||||
user: user.clone(),
|
user: user.clone(),
|
||||||
|
|||||||
@@ -359,8 +359,8 @@ impl QueryRouter {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::config::PoolMode;
|
||||||
use crate::messages::simple_query;
|
use crate::messages::simple_query;
|
||||||
use crate::pool::PoolMode;
|
|
||||||
use crate::sharding::ShardingFunction;
|
use crate::sharding::ShardingFunction;
|
||||||
use bytes::BufMut;
|
use bytes::BufMut;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user