mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
14 Commits
levkk-dist
...
v0.3.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d33ccf4b0 | ||
|
|
7987c5ffad | ||
|
|
24f5eec3ea | ||
|
|
af064ef447 | ||
|
|
e84a6f834c | ||
|
|
19fd677891 | ||
|
|
964a5e1708 | ||
|
|
d126c7424d | ||
|
|
f72dac420b | ||
|
|
3a729bb75b | ||
|
|
85cc2f4147 | ||
|
|
8c09ab6c20 | ||
|
|
f7a951745c | ||
|
|
4ae1bc8d32 |
106
Cargo.lock
generated
106
Cargo.lock
generated
@@ -53,14 +53,14 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||
|
||||
[[package]]
|
||||
name = "bb8"
|
||||
version = "0.7.1"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2e9f4fa9768efd269499d8fba693260cfc670891cf6de3adc935588447a77cc8"
|
||||
checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"parking_lot",
|
||||
"parking_lot 0.12.1",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -493,7 +493,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
|
||||
dependencies = [
|
||||
"instant",
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
"parking_lot_core 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -510,6 +520,19 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pgcat"
|
||||
version = "0.6.0-alpha1"
|
||||
@@ -528,7 +551,7 @@ dependencies = [
|
||||
"md-5",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"parking_lot 0.12.1",
|
||||
"phf",
|
||||
"rand",
|
||||
"regex",
|
||||
@@ -546,20 +569,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "phf"
|
||||
version = "0.10.1"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259"
|
||||
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
|
||||
dependencies = [
|
||||
"phf_macros",
|
||||
"phf_shared",
|
||||
"proc-macro-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf_generator"
|
||||
version = "0.10.0"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6"
|
||||
checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf"
|
||||
dependencies = [
|
||||
"phf_shared",
|
||||
"rand",
|
||||
@@ -567,13 +589,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "phf_macros"
|
||||
version = "0.10.0"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0"
|
||||
checksum = "92aacdc5f16768709a569e913f7451034034178b05bdc8acda226659a3dccc66"
|
||||
dependencies = [
|
||||
"phf_generator",
|
||||
"phf_shared",
|
||||
"proc-macro-hack",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
@@ -581,9 +602,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "phf_shared"
|
||||
version = "0.10.0"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
|
||||
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
|
||||
dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
@@ -606,12 +627,6 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.36"
|
||||
@@ -832,9 +847,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.14.0"
|
||||
version = "0.23.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8f192f29f4aa49e57bebd0aa05858e0a1f32dd270af36efe49edb82cbfffab6"
|
||||
checksum = "0beb13adabbdda01b63d595f38c8bfd19a361e697fd94ce0098a634077bc5b25"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
@@ -913,7 +928,7 @@ dependencies = [
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"parking_lot 0.11.2",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"tokio-macros",
|
||||
@@ -1168,3 +1183,46 @@ name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
|
||||
dependencies = [
|
||||
"windows_aarch64_msvc",
|
||||
"windows_i686_gnu",
|
||||
"windows_i686_msvc",
|
||||
"windows_x86_64_gnu",
|
||||
"windows_x86_64_msvc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
|
||||
|
||||
@@ -9,7 +9,7 @@ edition = "2021"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
bytes = "1"
|
||||
md-5 = "0.10"
|
||||
bb8 = "0.7"
|
||||
bb8 = "0.8.0"
|
||||
async-trait = "0.1"
|
||||
rand = "0.8"
|
||||
chrono = "0.4"
|
||||
@@ -20,11 +20,11 @@ serde_derive = "1"
|
||||
regex = "1"
|
||||
num_cpus = "1"
|
||||
once_cell = "1"
|
||||
sqlparser = "0.14"
|
||||
sqlparser = "0.23.0"
|
||||
log = "0.4"
|
||||
arc-swap = "1"
|
||||
env_logger = "0.9"
|
||||
parking_lot = "0.11"
|
||||
parking_lot = "0.12.1"
|
||||
hmac = "0.12"
|
||||
sha2 = "0.10"
|
||||
base64 = "0.13"
|
||||
@@ -32,5 +32,5 @@ stringprep = "0.1"
|
||||
tokio-rustls = "0.23"
|
||||
rustls-pemfile = "1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
phf = { version = "0.10", features = ["macros"] }
|
||||
phf = { version = "0.11.1", features = ["macros"] }
|
||||
exitcode = "1.1.2"
|
||||
|
||||
@@ -60,6 +60,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
|
||||
| **`user`** | | |
|
||||
| `name` | The user name. | `sharding_user` |
|
||||
| `password` | The user password in plaintext. | `hunter2` |
|
||||
| `statement_timeout` | Timeout in milliseconds for how long a query takes to execute | `0 (disabled)`
|
||||
| | | |
|
||||
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
||||
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
||||
|
||||
211
src/admin.rs
211
src/admin.rs
@@ -2,22 +2,25 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{info, trace};
|
||||
use std::collections::HashMap;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::config::{get_config, reload_config, VERSION};
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::get_all_pools;
|
||||
use crate::stats::get_stats;
|
||||
use crate::stats::{
|
||||
get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState,
|
||||
};
|
||||
use crate::ClientServerMap;
|
||||
|
||||
pub fn generate_server_info_for_admin() -> BytesMut {
|
||||
let mut server_info = BytesMut::new();
|
||||
|
||||
server_info.put(server_paramater_message("application_name", ""));
|
||||
server_info.put(server_paramater_message("client_encoding", "UTF8"));
|
||||
server_info.put(server_paramater_message("server_encoding", "UTF8"));
|
||||
server_info.put(server_paramater_message("server_version", VERSION));
|
||||
server_info.put(server_paramater_message("DateStyle", "ISO, MDY"));
|
||||
server_info.put(server_parameter_message("application_name", ""));
|
||||
server_info.put(server_parameter_message("client_encoding", "UTF8"));
|
||||
server_info.put(server_parameter_message("server_encoding", "UTF8"));
|
||||
server_info.put(server_parameter_message("server_version", VERSION));
|
||||
server_info.put(server_parameter_message("DateStyle", "ISO, MDY"));
|
||||
|
||||
return server_info;
|
||||
}
|
||||
@@ -72,6 +75,14 @@ where
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream).await
|
||||
}
|
||||
"CLIENTS" => {
|
||||
trace!("SHOW CLIENTS");
|
||||
show_clients(stream).await
|
||||
}
|
||||
"SERVERS" => {
|
||||
trace!("SHOW SERVERS");
|
||||
show_servers(stream).await
|
||||
}
|
||||
"STATS" => {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream).await
|
||||
@@ -91,7 +102,8 @@ async fn show_lists<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
let client_stats = get_client_stats();
|
||||
let server_stats = get_server_stats();
|
||||
|
||||
let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];
|
||||
|
||||
@@ -111,18 +123,18 @@ where
|
||||
res.put(data_row(&vec!["pools".to_string(), databases.to_string()]));
|
||||
res.put(data_row(&vec![
|
||||
"free_clients".to_string(),
|
||||
stats
|
||||
client_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_idle"])
|
||||
.sum::<i64>()
|
||||
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_clients".to_string(),
|
||||
stats
|
||||
client_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["cl_active"])
|
||||
.sum::<i64>()
|
||||
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
@@ -131,18 +143,18 @@ where
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"free_servers".to_string(),
|
||||
stats
|
||||
server_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_idle"])
|
||||
.sum::<i64>()
|
||||
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec![
|
||||
"used_servers".to_string(),
|
||||
stats
|
||||
server_stats
|
||||
.keys()
|
||||
.map(|address_id| stats[&address_id]["sv_active"])
|
||||
.sum::<i64>()
|
||||
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active)
|
||||
.count()
|
||||
.to_string(),
|
||||
]));
|
||||
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||
@@ -182,11 +194,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let stats = get_stats();
|
||||
let all_pool_stats = get_pool_stats();
|
||||
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("pool_mode", DataType::Text),
|
||||
("cl_idle", DataType::Numeric),
|
||||
("cl_active", DataType::Numeric),
|
||||
("cl_waiting", DataType::Numeric),
|
||||
@@ -198,32 +211,33 @@ where
|
||||
("sv_login", DataType::Numeric),
|
||||
("maxwait", DataType::Numeric),
|
||||
("maxwait_us", DataType::Numeric),
|
||||
("pool_mode", DataType::Text),
|
||||
];
|
||||
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
for (_, pool) in get_all_pools() {
|
||||
for (user_pool, pool) in get_all_pools() {
|
||||
let def = HashMap::default();
|
||||
let pool_stats = all_pool_stats
|
||||
.get(&(user_pool.db.clone(), user_pool.user.clone()))
|
||||
.unwrap_or(&def);
|
||||
|
||||
let pool_config = &pool.settings;
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name(), pool_config.user.username.clone()];
|
||||
|
||||
for column in &columns[2..columns.len() - 1] {
|
||||
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||
row.push(value);
|
||||
let mut row = vec![
|
||||
user_pool.db.clone(),
|
||||
user_pool.user.clone(),
|
||||
pool_config.pool_mode.to_string(),
|
||||
];
|
||||
for column in &columns[3..columns.len()] {
|
||||
let value = match column.0 {
|
||||
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
|
||||
"maxwait_us" => {
|
||||
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
|
||||
}
|
||||
|
||||
row.push(pool_config.pool_mode.to_string());
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
|
||||
};
|
||||
row.push(value);
|
||||
}
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
@@ -387,6 +401,7 @@ where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("instance", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("total_xact_count", DataType::Numeric),
|
||||
@@ -396,32 +411,32 @@ where
|
||||
("total_xact_time", DataType::Numeric),
|
||||
("total_query_time", DataType::Numeric),
|
||||
("total_wait_time", DataType::Numeric),
|
||||
("total_errors", DataType::Numeric),
|
||||
("avg_xact_count", DataType::Numeric),
|
||||
("avg_query_count", DataType::Numeric),
|
||||
("avg_recv", DataType::Numeric),
|
||||
("avg_sent", DataType::Numeric),
|
||||
("avg_errors", DataType::Numeric),
|
||||
("avg_xact_time", DataType::Numeric),
|
||||
("avg_query_time", DataType::Numeric),
|
||||
("avg_wait_time", DataType::Numeric),
|
||||
];
|
||||
|
||||
let stats = get_stats();
|
||||
let all_stats = get_address_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for ((_db_name, username), pool) in get_all_pools() {
|
||||
for (user_pool, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let stats = match stats.get(&address.id) {
|
||||
let stats = match all_stats.get(&address.id) {
|
||||
Some(stats) => stats.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let mut row = vec![address.name()];
|
||||
row.push(username.clone());
|
||||
|
||||
for column in &columns[2..] {
|
||||
let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()];
|
||||
for column in &columns[3..] {
|
||||
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||
}
|
||||
|
||||
@@ -439,3 +454,107 @@ where
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Show currently connected clients
|
||||
async fn show_clients<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("client_id", DataType::Text),
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("application_name", DataType::Text),
|
||||
("state", DataType::Text),
|
||||
("transaction_count", DataType::Numeric),
|
||||
("query_count", DataType::Numeric),
|
||||
("error_count", DataType::Numeric),
|
||||
("age_seconds", DataType::Numeric),
|
||||
];
|
||||
|
||||
let new_map = get_client_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for (_, client) in new_map {
|
||||
let row = vec![
|
||||
format!("{:#010X}", client.client_id),
|
||||
client.pool_name,
|
||||
client.username,
|
||||
client.application_name.clone(),
|
||||
client.state.to_string(),
|
||||
client.transaction_count.to_string(),
|
||||
client.query_count.to_string(),
|
||||
client.error_count.to_string(),
|
||||
Instant::now()
|
||||
.duration_since(client.connect_time)
|
||||
.as_secs()
|
||||
.to_string(),
|
||||
];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
/// Show currently connected servers
|
||||
async fn show_servers<T>(stream: &mut T) -> Result<(), Error>
|
||||
where
|
||||
T: tokio::io::AsyncWrite + std::marker::Unpin,
|
||||
{
|
||||
let columns = vec![
|
||||
("server_id", DataType::Text),
|
||||
("database_name", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("address_id", DataType::Text),
|
||||
("application_name", DataType::Text),
|
||||
("state", DataType::Text),
|
||||
("transaction_count", DataType::Numeric),
|
||||
("query_count", DataType::Numeric),
|
||||
("bytes_sent", DataType::Numeric),
|
||||
("bytes_received", DataType::Numeric),
|
||||
("age_seconds", DataType::Numeric),
|
||||
];
|
||||
|
||||
let new_map = get_server_stats();
|
||||
let mut res = BytesMut::new();
|
||||
res.put(row_description(&columns));
|
||||
|
||||
for (_, server) in new_map {
|
||||
let row = vec![
|
||||
format!("{:#010X}", server.server_id),
|
||||
server.pool_name,
|
||||
server.username,
|
||||
server.address_name,
|
||||
server.application_name,
|
||||
server.state.to_string(),
|
||||
server.transaction_count.to_string(),
|
||||
server.query_count.to_string(),
|
||||
server.bytes_sent.to_string(),
|
||||
server.bytes_received.to_string(),
|
||||
Instant::now()
|
||||
.duration_since(server.connect_time)
|
||||
.as_secs()
|
||||
.to_string(),
|
||||
];
|
||||
|
||||
res.put(data_row(&row));
|
||||
}
|
||||
|
||||
res.put(command_complete("SHOW"));
|
||||
|
||||
// ReadyForQuery
|
||||
res.put_u8(b'Z');
|
||||
res.put_i32(5);
|
||||
res.put_u8(b'I');
|
||||
|
||||
write_all_half(stream, res).await
|
||||
}
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
/// Handle clients by pretending to be a PostgreSQL server.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{debug, error, info, trace};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::admin::{generate_server_info_for_admin, handle_admin};
|
||||
use crate::config::{get_config, Address};
|
||||
use crate::config::{get_config, Address, PoolMode};
|
||||
use crate::constants::*;
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::{get_pool, ClientServerMap, ConnectionPool, PoolMode};
|
||||
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
|
||||
use crate::query_router::{Command, QueryRouter};
|
||||
use crate::server::Server;
|
||||
use crate::stats::{get_reporter, Reporter};
|
||||
@@ -435,7 +436,7 @@ where
|
||||
);
|
||||
|
||||
if password_hash != password_response {
|
||||
debug!("Password authentication failed");
|
||||
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", pool_name, username, application_name);
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(Error::ClientError);
|
||||
@@ -445,7 +446,7 @@ where
|
||||
}
|
||||
// Authenticate normal user.
|
||||
else {
|
||||
let pool = match get_pool(pool_name.clone(), username.clone()) {
|
||||
let pool = match get_pool(&pool_name, &username) {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
error_response(
|
||||
@@ -457,6 +458,7 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
warn!("Invalid pool name {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", pool_name, username, application_name);
|
||||
return Err(Error::ClientError);
|
||||
}
|
||||
};
|
||||
@@ -465,7 +467,7 @@ where
|
||||
let password_hash = md5_hash_password(&username, &pool.settings.user.password, &salt);
|
||||
|
||||
if password_hash != password_response {
|
||||
debug!("Password authentication failed");
|
||||
warn!("Invalid password {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", pool_name, username, application_name);
|
||||
wrong_password(&mut write, username).await?;
|
||||
|
||||
return Err(Error::ClientError);
|
||||
@@ -577,6 +579,12 @@ where
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
self.stats.client_register(
|
||||
self.process_id,
|
||||
self.pool_name.clone(),
|
||||
self.username.clone(),
|
||||
self.application_name.clone(),
|
||||
);
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
@@ -640,7 +648,7 @@ where
|
||||
// Get a pool instance referenced by the most up-to-date
|
||||
// pointer. This ensures we always read the latest config
|
||||
// when starting a query.
|
||||
let pool = match get_pool(self.pool_name.clone(), self.username.clone()) {
|
||||
let pool = match get_pool(&self.pool_name, &self.username) {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
error_response(
|
||||
@@ -651,6 +659,8 @@ where
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
warn!("Invalid pool name {{ username: {:?}, pool_name: {:?}, application_name: {:?} }}", self.pool_name, self.username, self.application_name);
|
||||
return Err(Error::ClientError);
|
||||
}
|
||||
};
|
||||
@@ -764,15 +774,12 @@ where
|
||||
server.claim(self.process_id, self.secret_key);
|
||||
self.connected_to_server = true;
|
||||
|
||||
// Update statistics.
|
||||
if let Some(last_address_id) = self.last_address_id {
|
||||
self.stats
|
||||
.client_disconnecting(self.process_id, last_address_id);
|
||||
}
|
||||
self.stats.client_active(self.process_id, address.id);
|
||||
// Update statistics
|
||||
self.stats
|
||||
.client_active(self.process_id, server.server_id());
|
||||
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.process_id());
|
||||
self.last_server_id = Some(server.server_id());
|
||||
|
||||
debug!(
|
||||
"Client {:?} talking to server {:?}",
|
||||
@@ -830,7 +837,7 @@ where
|
||||
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -908,7 +915,7 @@ where
|
||||
self.buffer.clear();
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -943,7 +950,7 @@ where
|
||||
};
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
self.stats.transaction(self.process_id, server.server_id());
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
@@ -964,11 +971,11 @@ where
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
server.checkin_cleanup().await?;
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
self.stats.server_idle(server.server_id());
|
||||
self.connected_to_server = false;
|
||||
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id, address.id);
|
||||
self.stats.client_idle(self.process_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -991,6 +998,7 @@ where
|
||||
self.send_server_message(server, message, &address, &pool)
|
||||
.await?;
|
||||
|
||||
let query_start = Instant::now();
|
||||
// Read all data the server has to offer, which can be multiple messages
|
||||
// buffered in 8196 bytes chunks.
|
||||
loop {
|
||||
@@ -1010,7 +1018,11 @@ where
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query(self.process_id, address.id);
|
||||
self.stats.query(
|
||||
self.process_id,
|
||||
server.server_id(),
|
||||
Instant::now().duration_since(query_start).as_millis(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1091,14 +1103,9 @@ impl<S, T> Drop for Client<S, T> {
|
||||
|
||||
// Dirty shutdown
|
||||
// TODO: refactor, this is not the best way to handle state management.
|
||||
if let Some(address_id) = self.last_address_id {
|
||||
self.stats.client_disconnecting(self.process_id, address_id);
|
||||
|
||||
if self.connected_to_server {
|
||||
if let Some(process_id) = self.last_server_id {
|
||||
self.stats.server_idle(process_id, address_id);
|
||||
}
|
||||
}
|
||||
self.stats.client_disconnecting(self.process_id);
|
||||
if self.connected_to_server && self.last_server_id.is_some() {
|
||||
self.stats.server_idle(self.last_server_id.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
456
src/config.rs
456
src/config.rs
@@ -3,7 +3,7 @@ use arc_swap::ArcSwap;
|
||||
use log::{error, info};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
@@ -12,8 +12,9 @@ use tokio::io::AsyncReadExt;
|
||||
use toml;
|
||||
|
||||
use crate::errors::Error;
|
||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||
use crate::sharding::ShardingFunction;
|
||||
use crate::tls::{load_certs, load_keys};
|
||||
use crate::{ClientServerMap, ConnectionPool};
|
||||
|
||||
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
||||
@@ -23,7 +24,9 @@ static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config
|
||||
/// Server role: primary or replica.
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
|
||||
pub enum Role {
|
||||
#[serde(alias = "primary", alias = "Primary")]
|
||||
Primary,
|
||||
#[serde(alias = "replica", alias = "Replica")]
|
||||
Replica,
|
||||
}
|
||||
|
||||
@@ -120,11 +123,12 @@ impl Address {
|
||||
}
|
||||
|
||||
/// PostgreSQL user.
|
||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Serialize, Deserialize, Debug)]
|
||||
#[derive(Clone, PartialEq, Hash, Eq, Serialize, Deserialize, Debug)]
|
||||
pub struct User {
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub pool_size: u32,
|
||||
#[serde(default)] // 0
|
||||
pub statement_timeout: u64,
|
||||
}
|
||||
|
||||
@@ -142,34 +146,81 @@ impl Default for User {
|
||||
/// General configuration.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct General {
|
||||
#[serde(default = "General::default_host")]
|
||||
pub host: String,
|
||||
|
||||
#[serde(default = "General::default_port")]
|
||||
pub port: i16,
|
||||
|
||||
pub enable_prometheus_exporter: Option<bool>,
|
||||
pub prometheus_exporter_port: i16,
|
||||
|
||||
#[serde(default = "General::default_connect_timeout")]
|
||||
pub connect_timeout: u64,
|
||||
pub healthcheck_timeout: u64,
|
||||
|
||||
#[serde(default = "General::default_shutdown_timeout")]
|
||||
pub shutdown_timeout: u64,
|
||||
|
||||
#[serde(default = "General::default_healthcheck_timeout")]
|
||||
pub healthcheck_timeout: u64,
|
||||
|
||||
#[serde(default = "General::default_healthcheck_delay")]
|
||||
pub healthcheck_delay: u64,
|
||||
|
||||
#[serde(default = "General::default_ban_time")]
|
||||
pub ban_time: i64,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub autoreload: bool,
|
||||
|
||||
pub tls_certificate: Option<String>,
|
||||
pub tls_private_key: Option<String>,
|
||||
pub admin_username: String,
|
||||
pub admin_password: String,
|
||||
}
|
||||
|
||||
impl General {
|
||||
pub fn default_host() -> String {
|
||||
"0.0.0.0".into()
|
||||
}
|
||||
|
||||
pub fn default_port() -> i16 {
|
||||
5432
|
||||
}
|
||||
|
||||
pub fn default_connect_timeout() -> u64 {
|
||||
1000
|
||||
}
|
||||
|
||||
pub fn default_shutdown_timeout() -> u64 {
|
||||
60000
|
||||
}
|
||||
|
||||
pub fn default_healthcheck_timeout() -> u64 {
|
||||
1000
|
||||
}
|
||||
|
||||
pub fn default_healthcheck_delay() -> u64 {
|
||||
30000
|
||||
}
|
||||
|
||||
pub fn default_ban_time() -> i64 {
|
||||
60
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for General {
|
||||
fn default() -> General {
|
||||
General {
|
||||
host: String::from("localhost"),
|
||||
port: 5432,
|
||||
host: Self::default_host(),
|
||||
port: Self::default_port(),
|
||||
enable_prometheus_exporter: Some(false),
|
||||
prometheus_exporter_port: 9930,
|
||||
connect_timeout: 5000,
|
||||
healthcheck_timeout: 1000,
|
||||
shutdown_timeout: 60000,
|
||||
healthcheck_delay: 30000,
|
||||
ban_time: 60,
|
||||
connect_timeout: General::default_connect_timeout(),
|
||||
shutdown_timeout: Self::default_shutdown_timeout(),
|
||||
healthcheck_timeout: Self::default_healthcheck_timeout(),
|
||||
healthcheck_delay: Self::default_healthcheck_delay(),
|
||||
ban_time: Self::default_ban_time(),
|
||||
autoreload: false,
|
||||
tls_certificate: None,
|
||||
tls_private_key: None,
|
||||
@@ -178,50 +229,166 @@ impl Default for General {
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct Pool {
|
||||
pub pool_mode: String,
|
||||
pub default_role: String,
|
||||
pub query_parser_enabled: bool,
|
||||
pub primary_reads_enabled: bool,
|
||||
pub sharding_function: String,
|
||||
pub shards: HashMap<String, Shard>,
|
||||
pub users: HashMap<String, User>,
|
||||
|
||||
/// Pool mode:
|
||||
/// - transaction: server serves one transaction,
|
||||
/// - session: server is attached to the client.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
||||
pub enum PoolMode {
|
||||
#[serde(alias = "transaction", alias = "Transaction")]
|
||||
Transaction,
|
||||
|
||||
#[serde(alias = "session", alias = "Session")]
|
||||
Session,
|
||||
}
|
||||
impl Default for Pool {
|
||||
fn default() -> Pool {
|
||||
Pool {
|
||||
pool_mode: String::from("transaction"),
|
||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||
users: HashMap::default(),
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: "pg_bigint_hash".to_string(),
|
||||
|
||||
impl ToString for PoolMode {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
PoolMode::Transaction => "transaction".to_string(),
|
||||
PoolMode::Session => "session".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Pool {
|
||||
#[serde(default = "Pool::default_pool_mode")]
|
||||
pub pool_mode: PoolMode,
|
||||
|
||||
pub default_role: String,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub query_parser_enabled: bool,
|
||||
|
||||
#[serde(default)] // False
|
||||
pub primary_reads_enabled: bool,
|
||||
|
||||
pub connect_timeout: Option<u64>,
|
||||
|
||||
pub sharding_function: ShardingFunction,
|
||||
pub shards: BTreeMap<String, Shard>,
|
||||
pub users: BTreeMap<String, User>,
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
pub fn default_pool_mode() -> PoolMode {
|
||||
PoolMode::Transaction
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), Error> {
|
||||
match self.default_role.as_ref() {
|
||||
"any" => (),
|
||||
"primary" => (),
|
||||
"replica" => (),
|
||||
other => {
|
||||
error!(
|
||||
"Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
|
||||
other
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
for (shard_idx, shard) in &self.shards {
|
||||
match shard_idx.parse::<usize>() {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Shard '{}' is not a valid number, shards must be numbered starting at 0",
|
||||
shard_idx
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
shard.validate()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Pool {
|
||||
fn default() -> Pool {
|
||||
Pool {
|
||||
pool_mode: Self::default_pool_mode(),
|
||||
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
|
||||
users: BTreeMap::default(),
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: false,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
connect_timeout: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
|
||||
pub struct ServerConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub role: Role,
|
||||
}
|
||||
|
||||
/// Shard configuration.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)]
|
||||
pub struct Shard {
|
||||
pub database: String,
|
||||
pub servers: Vec<(String, u16, String)>,
|
||||
pub servers: Vec<ServerConfig>,
|
||||
}
|
||||
|
||||
impl Shard {
|
||||
pub fn validate(&self) -> Result<(), Error> {
|
||||
// We use addresses as unique identifiers,
|
||||
// let's make sure they are unique in the config as well.
|
||||
let mut dup_check = HashSet::new();
|
||||
let mut primary_count = 0;
|
||||
|
||||
if self.servers.len() == 0 {
|
||||
error!("Shard {} has no servers configured", self.database);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
for server in &self.servers {
|
||||
dup_check.insert(server);
|
||||
|
||||
// Check that we define only zero or one primary.
|
||||
match server.role {
|
||||
Role::Primary => primary_count += 1,
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
if primary_count > 1 {
|
||||
error!(
|
||||
"Shard {} has more than on primary configured",
|
||||
self.database
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if dup_check.len() != self.servers.len() {
|
||||
error!("Shard {} contains duplicate server configs", self.database);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Shard {
|
||||
fn default() -> Shard {
|
||||
Shard {
|
||||
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
|
||||
servers: vec![ServerConfig {
|
||||
host: String::from("localhost"),
|
||||
port: 5432,
|
||||
role: Role::Primary,
|
||||
}],
|
||||
database: String::from("postgres"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_path() -> String {
|
||||
String::from("pgcat.toml")
|
||||
}
|
||||
|
||||
/// Configuration wrapper.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct Config {
|
||||
@@ -229,24 +396,30 @@ pub struct Config {
|
||||
// so we should always put simple fields before nested fields
|
||||
// in all serializable structs to avoid ValueAfterTable errors
|
||||
// These errors occur when the toml serializer is about to produce
|
||||
// ambigous toml structure like the one below
|
||||
// ambiguous toml structure like the one below
|
||||
// [main]
|
||||
// field1_under_main = 1
|
||||
// field2_under_main = 2
|
||||
// [main.subconf]
|
||||
// field1_under_subconf = 1
|
||||
// field3_under_main = 3 # This field will be interpreted as being under subconf and not under main
|
||||
#[serde(default = "default_path")]
|
||||
#[serde(default = "Config::default_path")]
|
||||
pub path: String,
|
||||
|
||||
pub general: General,
|
||||
pub pools: HashMap<String, Pool>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn default_path() -> String {
|
||||
String::from("pgcat.toml")
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Config {
|
||||
Config {
|
||||
path: String::from("pgcat.toml"),
|
||||
path: Self::default_path(),
|
||||
general: General::default(),
|
||||
pools: HashMap::default(),
|
||||
}
|
||||
@@ -262,7 +435,7 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
[
|
||||
(
|
||||
format!("pools.{}.pool_mode", pool_name),
|
||||
pool.pool_mode.clone(),
|
||||
pool.pool_mode.to_string(),
|
||||
),
|
||||
(
|
||||
format!("pools.{}.primary_reads_enabled", pool_name),
|
||||
@@ -278,7 +451,7 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
),
|
||||
(
|
||||
format!("pools.{}.sharding_function", pool_name),
|
||||
pool.sharding_function.clone(),
|
||||
pool.sharding_function.to_string(),
|
||||
),
|
||||
(
|
||||
format!("pools.{:?}.shard_count", pool_name),
|
||||
@@ -370,10 +543,22 @@ impl Config {
|
||||
.sum::<u32>()
|
||||
.to_string()
|
||||
);
|
||||
info!("[pool: {}] Pool mode: {}", pool_name, pool_config.pool_mode);
|
||||
info!(
|
||||
"[pool: {}] Pool mode: {:?}",
|
||||
pool_name, pool_config.pool_mode
|
||||
);
|
||||
let connect_timeout = match pool_config.connect_timeout {
|
||||
Some(connect_timeout) => connect_timeout,
|
||||
None => self.general.connect_timeout,
|
||||
};
|
||||
info!(
|
||||
"[pool: {}] Connection timeout: {}ms",
|
||||
pool_name, connect_timeout
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Sharding function: {}",
|
||||
pool_name, pool_config.sharding_function
|
||||
pool_name,
|
||||
pool_config.sharding_function.to_string()
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Primary reads: {}",
|
||||
@@ -406,6 +591,50 @@ impl Config {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validate(&mut self) -> Result<(), Error> {
|
||||
// Validate TLS!
|
||||
match self.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
match load_certs(&Path::new(&tls_certificate)) {
|
||||
Ok(_) => {
|
||||
// Cert is okay, but what about the private key?
|
||||
match self.general.tls_private_key.clone() {
|
||||
Some(tls_private_key) => {
|
||||
match load_keys(&Path::new(&tls_private_key)) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!(
|
||||
"tls_private_key is incorrectly configured: {:?}",
|
||||
err
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
error!("tls_certificate is set, but the tls_private_key is not");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("tls_certificate is incorrectly configured: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
|
||||
for (_, pool) in &mut self.pools {
|
||||
pool.validate()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a read-only instance of the configuration
|
||||
@@ -442,132 +671,7 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
// Validate TLS!
|
||||
match config.general.tls_certificate.clone() {
|
||||
Some(tls_certificate) => {
|
||||
match load_certs(&Path::new(&tls_certificate)) {
|
||||
Ok(_) => {
|
||||
// Cert is okay, but what about the private key?
|
||||
match config.general.tls_private_key.clone() {
|
||||
Some(tls_private_key) => match load_keys(&Path::new(&tls_private_key)) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("tls_private_key is incorrectly configured: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
},
|
||||
|
||||
None => {
|
||||
error!("tls_certificate is set, but the tls_private_key is not");
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("tls_certificate is incorrectly configured: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
|
||||
for (pool_name, pool) in &config.pools {
|
||||
match pool.sharding_function.as_ref() {
|
||||
"pg_bigint_hash" => (),
|
||||
"sha1" => (),
|
||||
_ => {
|
||||
error!(
|
||||
"Supported sharding functions are: 'pg_bigint_hash', 'sha1', got: '{}' in pool {} settings",
|
||||
pool.sharding_function,
|
||||
pool_name
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
match pool.default_role.as_ref() {
|
||||
"any" => (),
|
||||
"primary" => (),
|
||||
"replica" => (),
|
||||
other => {
|
||||
error!(
|
||||
"Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
|
||||
other
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
match pool.pool_mode.as_ref() {
|
||||
"transaction" => (),
|
||||
"session" => (),
|
||||
other => {
|
||||
error!(
|
||||
"pool_mode can be 'session' or 'transaction', got: '{}'",
|
||||
other
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
for shard in &pool.shards {
|
||||
// We use addresses as unique identifiers,
|
||||
// let's make sure they are unique in the config as well.
|
||||
let mut dup_check = HashSet::new();
|
||||
let mut primary_count = 0;
|
||||
|
||||
match shard.0.parse::<usize>() {
|
||||
Ok(_) => (),
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Shard '{}' is not a valid number, shards must be numbered starting at 0",
|
||||
shard.0
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
if shard.1.servers.len() == 0 {
|
||||
error!("Shard {} has no servers configured", shard.0);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
for server in &shard.1.servers {
|
||||
dup_check.insert(server);
|
||||
|
||||
// Check that we define only zero or one primary.
|
||||
match server.2.as_ref() {
|
||||
"primary" => primary_count += 1,
|
||||
_ => (),
|
||||
};
|
||||
|
||||
// Check role spelling.
|
||||
match server.2.as_ref() {
|
||||
"primary" => (),
|
||||
"replica" => (),
|
||||
_ => {
|
||||
error!(
|
||||
"Shard {} server role must be either 'primary' or 'replica', got: '{}'",
|
||||
shard.0, server.2
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if primary_count > 1 {
|
||||
error!("Shard {} has more than on primary configured", &shard.0);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
if dup_check.len() != shard.1.servers.len() {
|
||||
error!("Shard {} contains duplicate server configs", &shard.0);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
}
|
||||
}
|
||||
config.validate()?;
|
||||
|
||||
config.path = path.to_string();
|
||||
|
||||
@@ -589,7 +693,7 @@ pub async fn reload_config(client_server_map: ClientServerMap) -> Result<bool, E
|
||||
let new_config = get_config();
|
||||
|
||||
if old_config.pools != new_config.pools {
|
||||
info!("Pool configuration changed, re-creating server pools");
|
||||
info!("Pool configuration changed");
|
||||
ConnectionPool::from_config(client_server_map).await?;
|
||||
Ok(true)
|
||||
} else if old_config != new_config {
|
||||
@@ -617,12 +721,12 @@ mod test {
|
||||
assert_eq!(get_config().pools["simple_db"].users.len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
get_config().pools["sharded_db"].shards["0"].servers[0].0,
|
||||
get_config().pools["sharded_db"].shards["0"].servers[0].host,
|
||||
"127.0.0.1"
|
||||
);
|
||||
assert_eq!(
|
||||
get_config().pools["sharded_db"].shards["1"].servers[0].2,
|
||||
"primary"
|
||||
get_config().pools["sharded_db"].shards["1"].servers[0].role,
|
||||
Role::Primary
|
||||
);
|
||||
assert_eq!(
|
||||
get_config().pools["sharded_db"].shards["1"].database,
|
||||
@@ -640,11 +744,11 @@ mod test {
|
||||
assert_eq!(get_config().pools["sharded_db"].default_role, "any");
|
||||
|
||||
assert_eq!(
|
||||
get_config().pools["simple_db"].shards["0"].servers[0].0,
|
||||
get_config().pools["simple_db"].shards["0"].servers[0].host,
|
||||
"127.0.0.1"
|
||||
);
|
||||
assert_eq!(
|
||||
get_config().pools["simple_db"].shards["0"].servers[0].1,
|
||||
get_config().pools["simple_db"].shards["0"].servers[0].port,
|
||||
5432
|
||||
);
|
||||
assert_eq!(
|
||||
|
||||
32
src/lib.rs
Normal file
32
src/lib.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
pub mod config;
|
||||
pub mod constants;
|
||||
pub mod errors;
|
||||
pub mod messages;
|
||||
pub mod pool;
|
||||
pub mod scram;
|
||||
pub mod server;
|
||||
pub mod sharding;
|
||||
pub mod stats;
|
||||
pub mod tls;
|
||||
|
||||
/// Format chrono::Duration to be more human-friendly.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `duration` - A duration of time
|
||||
pub fn format_duration(duration: &chrono::Duration) -> String {
|
||||
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
||||
|
||||
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
||||
|
||||
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
||||
|
||||
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
||||
|
||||
let days = duration.num_days().to_string();
|
||||
|
||||
format!(
|
||||
"{}d {}:{}:{}.{}",
|
||||
days, hours, minutes, seconds, milliseconds
|
||||
)
|
||||
}
|
||||
27
src/main.rs
27
src/main.rs
@@ -37,8 +37,9 @@ extern crate tokio;
|
||||
extern crate tokio_rustls;
|
||||
extern crate toml;
|
||||
|
||||
use log::{debug, error, info};
|
||||
use log::{error, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use pgcat::format_duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::{
|
||||
signal::unix::{signal as unix_signal, SignalKind},
|
||||
@@ -278,7 +279,7 @@ async fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Client disconnected with error {:?}", err);
|
||||
warn!("Client disconnected with error {:?}", err);
|
||||
}
|
||||
};
|
||||
});
|
||||
@@ -301,25 +302,3 @@ async fn main() {
|
||||
|
||||
info!("Shutting down...");
|
||||
}
|
||||
|
||||
/// Format chrono::Duration to be more human-friendly.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `duration` - A duration of time
|
||||
fn format_duration(duration: &chrono::Duration) -> String {
|
||||
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
||||
|
||||
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
||||
|
||||
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
||||
|
||||
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
||||
|
||||
let days = duration.num_days().to_string();
|
||||
|
||||
format!(
|
||||
"{}d {}:{}:{}.{}",
|
||||
days, hours, minutes, seconds, milliseconds
|
||||
)
|
||||
}
|
||||
|
||||
@@ -496,7 +496,7 @@ where
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
pub fn server_paramater_message(key: &str, value: &str) -> BytesMut {
|
||||
pub fn server_parameter_message(key: &str, value: &str) -> BytesMut {
|
||||
let mut server_info = BytesMut::new();
|
||||
|
||||
let null_byte_size = 1;
|
||||
|
||||
177
src/pool.rs
177
src/pool.rs
@@ -8,39 +8,50 @@ use once_cell::sync::Lazy;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::config::{get_config, Address, Role, User};
|
||||
use crate::config::{get_config, Address, PoolMode, Role, User};
|
||||
use crate::errors::Error;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use crate::stats::{get_reporter, Reporter};
|
||||
|
||||
pub type ProcessId = i32;
|
||||
pub type SecretKey = i32;
|
||||
pub type ServerHost = String;
|
||||
pub type ServerPort = u16;
|
||||
|
||||
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
|
||||
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, u16)>>>;
|
||||
pub type PoolMap = HashMap<(String, String), ConnectionPool>;
|
||||
pub type ClientServerMap =
|
||||
Arc<Mutex<HashMap<(ProcessId, SecretKey), (ProcessId, SecretKey, ServerHost, ServerPort)>>>;
|
||||
pub type PoolMap = HashMap<PoolIdentifier, ConnectionPool>;
|
||||
/// The connection pool, globally available.
|
||||
/// This is atomic and safe and read-optimized.
|
||||
/// The pool is recreated dynamically when the config is reloaded.
|
||||
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
|
||||
static POOLS_HASH: Lazy<ArcSwap<HashSet<crate::config::Pool>>> =
|
||||
Lazy::new(|| ArcSwap::from_pointee(HashSet::default()));
|
||||
|
||||
/// Pool mode:
|
||||
/// - transaction: server serves one transaction,
|
||||
/// - session: server is attached to the client.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum PoolMode {
|
||||
Session,
|
||||
Transaction,
|
||||
/// An identifier for a PgCat pool,
|
||||
/// a database visible to clients.
|
||||
#[derive(Hash, Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PoolIdentifier {
|
||||
// The name of the database clients want to connect to.
|
||||
pub db: String,
|
||||
|
||||
/// The username the client connects with. Each user gets its own pool.
|
||||
pub user: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PoolMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match *self {
|
||||
PoolMode::Session => write!(f, "session"),
|
||||
PoolMode::Transaction => write!(f, "transaction"),
|
||||
impl PoolIdentifier {
|
||||
/// Create a new user/pool identifier.
|
||||
pub fn new(db: &str, user: &str) -> PoolIdentifier {
|
||||
PoolIdentifier {
|
||||
db: db.to_string(),
|
||||
user: user.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -119,9 +130,37 @@ impl ConnectionPool {
|
||||
let mut new_pools = HashMap::new();
|
||||
let mut address_id = 0;
|
||||
|
||||
let mut pools_hash = (*(*POOLS_HASH.load())).clone();
|
||||
|
||||
for (pool_name, pool_config) in &config.pools {
|
||||
let changed = pools_hash.insert(pool_config.clone());
|
||||
|
||||
// There is one pool per database/user pair.
|
||||
for (_, user) in &pool_config.users {
|
||||
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
|
||||
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
|
||||
if !changed {
|
||||
match get_pool(&pool_name, &user.username) {
|
||||
Some(pool) => {
|
||||
info!(
|
||||
"[pool: {}][user: {}] has not changed",
|
||||
pool_name, user.username
|
||||
);
|
||||
new_pools.insert(
|
||||
PoolIdentifier::new(&pool_name, &user.username),
|
||||
pool.clone(),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"[pool: {}][user: {}] creating new pool",
|
||||
pool_name, user.username
|
||||
);
|
||||
|
||||
let mut shards = Vec::new();
|
||||
let mut addresses = Vec::new();
|
||||
let mut banlist = Vec::new();
|
||||
@@ -143,21 +182,12 @@ impl ConnectionPool {
|
||||
let mut replica_number = 0;
|
||||
|
||||
for server in shard.servers.iter() {
|
||||
let role = match server.2.as_ref() {
|
||||
"primary" => Role::Primary,
|
||||
"replica" => Role::Replica,
|
||||
_ => {
|
||||
error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2);
|
||||
Role::Replica
|
||||
}
|
||||
};
|
||||
|
||||
let address = Address {
|
||||
id: address_id,
|
||||
database: shard.database.clone(),
|
||||
host: server.0.clone(),
|
||||
port: server.1 as u16,
|
||||
role: role,
|
||||
host: server.host.clone(),
|
||||
port: server.port,
|
||||
role: server.role,
|
||||
address_index,
|
||||
replica_number,
|
||||
shard: shard_idx.parse::<usize>().unwrap(),
|
||||
@@ -168,7 +198,7 @@ impl ConnectionPool {
|
||||
address_id += 1;
|
||||
address_index += 1;
|
||||
|
||||
if role == Role::Replica {
|
||||
if server.role == Role::Replica {
|
||||
replica_number += 1;
|
||||
}
|
||||
|
||||
@@ -180,11 +210,14 @@ impl ConnectionPool {
|
||||
get_reporter(),
|
||||
);
|
||||
|
||||
let connect_timeout = match pool_config.connect_timeout {
|
||||
Some(connect_timeout) => connect_timeout,
|
||||
None => config.general.connect_timeout,
|
||||
};
|
||||
|
||||
let pool = Pool::builder()
|
||||
.max_size(user.pool_size)
|
||||
.connection_timeout(std::time::Duration::from_millis(
|
||||
config.general.connect_timeout,
|
||||
))
|
||||
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
|
||||
.test_on_check_out(false)
|
||||
.build(manager)
|
||||
.await
|
||||
@@ -208,11 +241,7 @@ impl ConnectionPool {
|
||||
stats: get_reporter(),
|
||||
server_info: BytesMut::new(),
|
||||
settings: PoolSettings {
|
||||
pool_mode: match pool_config.pool_mode.as_str() {
|
||||
"transaction" => PoolMode::Transaction,
|
||||
"session" => PoolMode::Session,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
pool_mode: pool_config.pool_mode,
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
user: user.clone(),
|
||||
@@ -224,11 +253,7 @@ impl ConnectionPool {
|
||||
},
|
||||
query_parser_enabled: pool_config.query_parser_enabled.clone(),
|
||||
primary_reads_enabled: pool_config.primary_reads_enabled,
|
||||
sharding_function: match pool_config.sharding_function.as_str() {
|
||||
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
|
||||
"sha1" => ShardingFunction::Sha1,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
sharding_function: pool_config.sharding_function,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -243,11 +268,12 @@ impl ConnectionPool {
|
||||
};
|
||||
|
||||
// There is one pool per database/user pair.
|
||||
new_pools.insert((pool_name.clone(), user.username.clone()), pool);
|
||||
new_pools.insert(PoolIdentifier::new(&pool_name, &user.username), pool);
|
||||
}
|
||||
}
|
||||
|
||||
POOLS.store(Arc::new(new_pools.clone()));
|
||||
POOLS_HASH.store(Arc::new(pools_hash.clone()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -307,7 +333,6 @@ impl ConnectionPool {
|
||||
role: Option<Role>, // primary or replica
|
||||
process_id: i32, // client id
|
||||
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
||||
let now = Instant::now();
|
||||
let mut candidates: Vec<&Address> = self.addresses[shard]
|
||||
.iter()
|
||||
.filter(|address| address.role == role)
|
||||
@@ -332,7 +357,8 @@ impl ConnectionPool {
|
||||
}
|
||||
|
||||
// Indicate we're waiting on a server connection from a pool.
|
||||
self.stats.client_waiting(process_id, address.id);
|
||||
let now = Instant::now();
|
||||
self.stats.client_waiting(process_id);
|
||||
|
||||
// Check if we can connect
|
||||
let mut conn = match self.databases[address.shard][address.address_index]
|
||||
@@ -343,8 +369,7 @@ impl ConnectionPool {
|
||||
Err(err) => {
|
||||
error!("Banning instance {:?}, error: {:?}", address, err);
|
||||
self.ban(&address, process_id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.client_checkout_error(process_id, address.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -361,27 +386,30 @@ impl ConnectionPool {
|
||||
// Health checks are pretty expensive.
|
||||
if !require_healthcheck {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, server.server_id());
|
||||
self.stats.server_active(process_id, server.server_id());
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
debug!("Running health check on server {:?}", address);
|
||||
|
||||
self.stats.server_tested(server.process_id(), address.id);
|
||||
self.stats.server_tested(server.server_id());
|
||||
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||
server.query(";"), // Cheap query (query parser not used in PG)
|
||||
server.query(";"), // Cheap query as it skips the query planner
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Check if health check succeeded.
|
||||
Ok(res) => match res {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.checkout_time(
|
||||
now.elapsed().as_micros(),
|
||||
process_id,
|
||||
conn.server_id(),
|
||||
);
|
||||
self.stats.server_active(process_id, conn.server_id());
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
@@ -421,10 +449,9 @@ impl ConnectionPool {
|
||||
/// Ban an address (i.e. replica). It no longer will serve
|
||||
/// traffic for any new transactions. Existing transactions on that replica
|
||||
/// will finish successfully or error out to the clients.
|
||||
pub fn ban(&self, address: &Address, process_id: i32) {
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
|
||||
pub fn ban(&self, address: &Address, client_id: i32) {
|
||||
error!("Banning {:?}", address);
|
||||
self.stats.client_ban_error(client_id, address.id);
|
||||
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let mut guard = self.banlist.write();
|
||||
@@ -560,14 +587,20 @@ impl ManageConnection for ServerPool {
|
||||
/// Attempts to create a new connection.
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
info!("Creating a new server connection {:?}", self.address);
|
||||
let server_id = rand::random::<i32>();
|
||||
|
||||
// Put a temporary process_id into the stats
|
||||
// for server login.
|
||||
let process_id = rand::random::<i32>();
|
||||
self.stats.server_login(process_id, self.address.id);
|
||||
self.stats.server_register(
|
||||
server_id,
|
||||
self.address.id,
|
||||
self.address.name(),
|
||||
self.address.pool_name.clone(),
|
||||
self.address.username.clone(),
|
||||
);
|
||||
self.stats.server_login(server_id);
|
||||
|
||||
// Connect to the PostgreSQL server.
|
||||
match Server::startup(
|
||||
server_id,
|
||||
&self.address,
|
||||
&self.user,
|
||||
&self.database,
|
||||
@@ -577,20 +610,18 @@ impl ManageConnection for ServerPool {
|
||||
.await
|
||||
{
|
||||
Ok(conn) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
self.stats.server_idle(server_id);
|
||||
Ok(conn)
|
||||
}
|
||||
Err(err) => {
|
||||
// Remove the temporary process_id from the stats.
|
||||
self.stats.server_disconnecting(process_id, self.address.id);
|
||||
self.stats.server_disconnecting(server_id);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if the connection is still connected to the database.
|
||||
async fn is_valid(&self, _conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Error> {
|
||||
async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -601,13 +632,18 @@ impl ManageConnection for ServerPool {
|
||||
}
|
||||
|
||||
/// Get the connection pool
|
||||
pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> {
|
||||
match get_all_pools().get(&(db, user)) {
|
||||
pub fn get_pool(db: &str, user: &str) -> Option<ConnectionPool> {
|
||||
match get_all_pools().get(&PoolIdentifier::new(&db, &user)) {
|
||||
Some(pool) => Some(pool.clone()),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a pointer to all configured pools.
|
||||
pub fn get_all_pools() -> HashMap<PoolIdentifier, ConnectionPool> {
|
||||
return (*(*POOLS.load())).clone();
|
||||
}
|
||||
|
||||
/// How many total servers we have in the config.
|
||||
pub fn get_number_of_addresses() -> usize {
|
||||
get_all_pools()
|
||||
@@ -615,8 +651,3 @@ pub fn get_number_of_addresses() -> usize {
|
||||
.map(|(_, pool)| pool.databases())
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Get a pointer to all configured pools.
|
||||
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
|
||||
return (*(*POOLS.load())).clone();
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::net::SocketAddr;
|
||||
|
||||
use crate::config::Address;
|
||||
use crate::pool::get_all_pools;
|
||||
use crate::stats::get_stats;
|
||||
use crate::stats::get_address_stats;
|
||||
|
||||
struct MetricHelpType {
|
||||
help: &'static str,
|
||||
@@ -164,7 +164,7 @@ impl PrometheusMetric {
|
||||
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
|
||||
match (request.method(), request.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let stats = get_stats();
|
||||
let stats: HashMap<usize, HashMap<String, i64>> = get_address_stats();
|
||||
|
||||
let mut lines = Vec::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
|
||||
@@ -169,8 +169,8 @@ impl QueryRouter {
|
||||
|
||||
Command::ShowShard => self.shard().to_string(),
|
||||
Command::ShowServerRole => match self.active_role {
|
||||
Some(Role::Primary) => String::from("primary"),
|
||||
Some(Role::Replica) => String::from("replica"),
|
||||
Some(Role::Primary) => Role::Primary.to_string(),
|
||||
Some(Role::Replica) => Role::Replica.to_string(),
|
||||
None => {
|
||||
if self.query_parser_enabled {
|
||||
String::from("auto")
|
||||
@@ -359,8 +359,8 @@ impl QueryRouter {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::config::PoolMode;
|
||||
use crate::messages::simple_query;
|
||||
use crate::pool::PoolMode;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use bytes::BufMut;
|
||||
|
||||
|
||||
@@ -14,12 +14,14 @@ use crate::config::{Address, User};
|
||||
use crate::constants::*;
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::pool::ClientServerMap;
|
||||
use crate::scram::ScramSha256;
|
||||
use crate::stats::Reporter;
|
||||
use crate::ClientServerMap;
|
||||
|
||||
/// Server state.
|
||||
pub struct Server {
|
||||
server_id: i32,
|
||||
|
||||
/// Server host, e.g. localhost,
|
||||
/// port, e.g. 5432, and role, e.g. primary or replica.
|
||||
address: Address,
|
||||
@@ -72,6 +74,7 @@ impl Server {
|
||||
/// Pretend to be the Postgres client and connect to the server given host, port and credentials.
|
||||
/// Perform the authentication and return the server in a ready for query state.
|
||||
pub async fn startup(
|
||||
server_id: i32,
|
||||
address: &Address,
|
||||
user: &User,
|
||||
database: &str,
|
||||
@@ -315,6 +318,7 @@ impl Server {
|
||||
write: write,
|
||||
buffer: BytesMut::with_capacity(8196),
|
||||
server_info: server_info,
|
||||
server_id: server_id,
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
in_transaction: false,
|
||||
@@ -372,8 +376,7 @@ impl Server {
|
||||
|
||||
/// Send messages to the server from the client.
|
||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||
self.stats
|
||||
.data_sent(messages.len(), self.process_id, self.address.id);
|
||||
self.stats.data_sent(messages.len(), self.server_id);
|
||||
|
||||
match write_all_half(&mut self.write, messages).await {
|
||||
Ok(_) => {
|
||||
@@ -505,8 +508,7 @@ impl Server {
|
||||
let bytes = self.buffer.clone();
|
||||
|
||||
// Keep track of how much data we got from the server for stats.
|
||||
self.stats
|
||||
.data_received(bytes.len(), self.process_id, self.address.id);
|
||||
self.stats.data_received(bytes.len(), self.server_id);
|
||||
|
||||
// Clear the buffer for next query.
|
||||
self.buffer.clear();
|
||||
@@ -589,6 +591,7 @@ impl Server {
|
||||
// server connection thrashing if clients repeatedly do this.
|
||||
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
||||
if self.in_transaction() {
|
||||
warn!("Server returned while still in transaction, rolling back transaction");
|
||||
self.query("ROLLBACK").await?;
|
||||
}
|
||||
|
||||
@@ -598,6 +601,7 @@ impl Server {
|
||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||
// it before each checkin.
|
||||
if self.needs_cleanup {
|
||||
warn!("Server returned with session state altered, discarding state");
|
||||
self.query("DISCARD ALL").await?;
|
||||
self.needs_cleanup = false;
|
||||
}
|
||||
@@ -629,9 +633,10 @@ impl Server {
|
||||
self.address.clone()
|
||||
}
|
||||
|
||||
/// Get the server's unique identifier.
|
||||
pub fn process_id(&self) -> i32 {
|
||||
self.process_id
|
||||
/// Get the server connection identifier
|
||||
/// Used to uniquely identify connection in statistics
|
||||
pub fn server_id(&self) -> i32 {
|
||||
self.server_id
|
||||
}
|
||||
|
||||
// Get server's latest response timestamp
|
||||
@@ -650,8 +655,7 @@ impl Drop for Server {
|
||||
/// the socket is in non-blocking mode, so it may not be ready
|
||||
/// for a write.
|
||||
fn drop(&mut self) {
|
||||
self.stats
|
||||
.server_disconnecting(self.process_id(), self.address.id);
|
||||
self.stats.server_disconnecting(self.server_id);
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(4);
|
||||
bytes.put_u8(b'X');
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
/// Implements various sharding functions.
|
||||
use sha1::{Digest, Sha1};
|
||||
|
||||
@@ -5,12 +6,23 @@ use sha1::{Digest, Sha1};
|
||||
const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
|
||||
|
||||
/// The sharding functions we support.
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize, Hash, std::cmp::Eq)]
|
||||
pub enum ShardingFunction {
|
||||
#[serde(alias = "pg_bigint_hash", alias = "PgBigintHash")]
|
||||
PgBigintHash,
|
||||
#[serde(alias = "sha1", alias = "Sha1")]
|
||||
Sha1,
|
||||
}
|
||||
|
||||
impl ToString for ShardingFunction {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
|
||||
ShardingFunction::Sha1 => "sha1".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The sharder.
|
||||
pub struct Sharder {
|
||||
/// Number of shards in the cluster.
|
||||
|
||||
1126
src/stats.rs
1126
src/stats.rs
File diff suppressed because it is too large
Load Diff
289
tests/ruby/admin_spec.rb
Normal file
289
tests/ruby/admin_spec.rb
Normal file
@@ -0,0 +1,289 @@
|
||||
# frozen_string_literal: true
|
||||
require 'uri'
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Admin" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
|
||||
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
|
||||
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "SHOW STATS" do
|
||||
context "clients connect and make one query" do
|
||||
it "updates *_query_time and *_wait_time" do
|
||||
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
|
||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||
connection.close
|
||||
|
||||
# wait for averages to be calculated, we shouldn't do this too often
|
||||
sleep(15.5)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||
admin_conn.close
|
||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||
expect(results["avg_query_time"].to_i).to_not eq(0)
|
||||
|
||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "SHOW POOLS" do
|
||||
context "bad credentials" do
|
||||
it "does not change any stats" do
|
||||
bad_passsword_url = URI(pgcat_conn_str)
|
||||
bad_passsword_url.password = "wrong"
|
||||
expect { PG::connect("#{bad_passsword_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
|
||||
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "bad database name" do
|
||||
it "does not change any stats" do
|
||||
bad_db_url = URI(pgcat_conn_str)
|
||||
bad_db_url.path = "/wrong_db"
|
||||
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
|
||||
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "client connects but issues no queries" do
|
||||
it "only affects cl_idle stats" do
|
||||
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
|
||||
sleep(1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("20")
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
|
||||
connections.map(&:close)
|
||||
sleep(1.1)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
end
|
||||
end
|
||||
|
||||
context "clients connect and make one query" do
|
||||
it "only affects cl_idle, sv_idle stats" do
|
||||
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
|
||||
end
|
||||
|
||||
sleep(1.1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_active"]).to eq("5")
|
||||
expect(results["sv_active"]).to eq("5")
|
||||
|
||||
sleep(3)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("5")
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
|
||||
connections.map(&:close)
|
||||
sleep(1)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
end
|
||||
end
|
||||
|
||||
context "client connects and opens a transaction and closes connection uncleanly" do
|
||||
it "produces correct statistics" do
|
||||
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
Thread.new do
|
||||
c.async_exec("BEGIN")
|
||||
c.async_exec("SELECT pg_sleep(0.01)")
|
||||
c.close
|
||||
end
|
||||
end
|
||||
|
||||
sleep(1.1)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["sv_idle"]).to eq("5")
|
||||
end
|
||||
end
|
||||
|
||||
context "client fail to checkout connection from the pool" do
|
||||
it "counts clients as idle" do
|
||||
new_configs = processes.pgcat.current_config
|
||||
new_configs["general"]["connect_timeout"] = 500
|
||||
new_configs["general"]["ban_time"] = 1
|
||||
new_configs["general"]["shutdown_timeout"] = 1
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
|
||||
threads = []
|
||||
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
|
||||
end
|
||||
|
||||
sleep(2)
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("5")
|
||||
expect(results["sv_idle"]).to eq("1")
|
||||
|
||||
threads.map(&:join)
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
|
||||
context "clients overwhelm server pools" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
|
||||
|
||||
it "cl_waiting is updated to show it" do
|
||||
threads = []
|
||||
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
|
||||
end
|
||||
|
||||
sleep(1.1) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
|
||||
expect(results["cl_waiting"]).to eq("2")
|
||||
expect(results["cl_active"]).to eq("2")
|
||||
expect(results["sv_active"]).to eq("2")
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
|
||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||
end
|
||||
expect(results["cl_idle"]).to eq("4")
|
||||
expect(results["sv_idle"]).to eq("2")
|
||||
|
||||
threads.map(&:join)
|
||||
connections.map(&:close)
|
||||
end
|
||||
|
||||
it "show correct max_wait" do
|
||||
threads = []
|
||||
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||
connections.each do |c|
|
||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
|
||||
end
|
||||
|
||||
sleep(2.5) # Allow time for stats to update
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
|
||||
expect(results["maxwait"]).to eq("1")
|
||||
expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000)
|
||||
|
||||
sleep(4.5) # Allow time for stats to update
|
||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||
expect(results["maxwait"]).to eq("0")
|
||||
|
||||
threads.map(&:join)
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "SHOW CLIENTS" do
|
||||
it "reports correct number and application names" do
|
||||
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
|
||||
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(21) # count admin clients
|
||||
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
|
||||
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
|
||||
|
||||
connections[0..5].map(&:close)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(15)
|
||||
|
||||
connections[6..].map(&:close)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
|
||||
admin_conn.close
|
||||
end
|
||||
|
||||
it "reports correct number of queries and transactions" do
|
||||
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
|
||||
|
||||
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
|
||||
connections.each do |c|
|
||||
c.async_exec("SELECT 1")
|
||||
c.async_exec("SELECT 2")
|
||||
c.async_exec("SELECT 3")
|
||||
c.async_exec("BEGIN")
|
||||
c.async_exec("SELECT 4")
|
||||
c.async_exec("SELECT 5")
|
||||
c.async_exec("COMMIT")
|
||||
end
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
sleep(1) # Wait for stats to be updated
|
||||
|
||||
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||
expect(results.count).to eq(3)
|
||||
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
|
||||
expect(normal_client_results[0]["transaction_count"]).to eq("4")
|
||||
expect(normal_client_results[1]["transaction_count"]).to eq("4")
|
||||
expect(normal_client_results[0]["query_count"]).to eq("7")
|
||||
expect(normal_client_results[1]["query_count"]).to eq("7")
|
||||
|
||||
admin_conn.close
|
||||
connections.map(&:close)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -46,6 +46,50 @@ module Helpers
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("trace")
|
||||
pgcat_cfg = pgcat.current_config
|
||||
|
||||
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
|
||||
# Main proxy configs
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "primary",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => {
|
||||
"database" => "shard0",
|
||||
"servers" => [
|
||||
["localhost", primary.port.to_s, "primary"]
|
||||
]
|
||||
},
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat_cfg["general"]["port"] = pgcat.port
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.primary = primary
|
||||
struct.all_databases = [primary]
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
|
||||
@@ -111,6 +111,6 @@ class PgcatProcess
|
||||
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
||||
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}?application_name=example_app"
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user