Compare commits

..

12 Commits

Author SHA1 Message Date
Lev Kokotov
a16699063e fmt 2023-06-17 08:57:19 -07:00
Juraj Bubniak
473bb3d17d Log not implemented messages as debug in prometheus metrics. (#477) 2023-06-16 18:48:38 -07:00
Lev Kokotov
c7d6273037 Support for prepared statements (#474)
* Start prepared statements

* parse

* Ok

* optional

* dont rewrite anonymous prepared stmts

* Dont rewrite anonymous prep statements

* hm?

* prep statements

* I see!

* comment

* Print config value

* Rewrite bind and add sqlx test

* fmt

* ok

* Fix

* Fix stats

* its late

* clean up PREPARE
2023-06-16 12:57:44 -07:00
Jeff Chen
94c781881f Report min_pool_size correctly (#471) 2023-06-12 09:23:56 -07:00
dependabot[bot]
a8c81e5df6 chore(deps): bump pin-project from 1.0.12 to 1.1.0 (#440)
Bumps [pin-project](https://github.com/taiki-e/pin-project) from 1.0.12 to 1.1.0.
- [Release notes](https://github.com/taiki-e/pin-project/releases)
- [Changelog](https://github.com/taiki-e/pin-project/blob/main/CHANGELOG.md)
- [Commits](https://github.com/taiki-e/pin-project/compare/v1.0.12...v1.1.0)

---
updated-dependencies:
- dependency-name: pin-project
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-12 00:51:24 -07:00
dependabot[bot]
1d3746ec9e chore(deps): bump sqlparser from 0.33.0 to 0.34.0 (#448)
Bumps [sqlparser](https://github.com/sqlparser-rs/sqlparser-rs) from 0.33.0 to 0.34.0.
- [Changelog](https://github.com/sqlparser-rs/sqlparser-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/sqlparser-rs/sqlparser-rs/compare/v0.33.0...v0.34.0)

---
updated-dependencies:
- dependency-name: sqlparser
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-12 00:51:05 -07:00
dependabot[bot]
b5489dc1e6 chore(deps): bump regex from 1.8.1 to 1.8.4 (#466)
Bumps [regex](https://github.com/rust-lang/regex) from 1.8.1 to 1.8.4.
- [Release notes](https://github.com/rust-lang/regex/releases)
- [Changelog](https://github.com/rust-lang/regex/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/regex/compare/1.8.1...1.8.4)

---
updated-dependencies:
- dependency-name: regex
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-12 00:50:46 -07:00
dependabot[bot]
557b425fb1 chore(deps): bump log from 0.4.17 to 0.4.19 (#470)
Bumps [log](https://github.com/rust-lang/log) from 0.4.17 to 0.4.19.
- [Release notes](https://github.com/rust-lang/log/releases)
- [Changelog](https://github.com/rust-lang/log/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/log/compare/0.4.17...0.4.19)

---
updated-dependencies:
- dependency-name: log
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-12 00:50:26 -07:00
Zain Kabani
aca9738821 Make queue strategy configurable and default to Fifo (#463)
* Change idle timeout default to 10 minutes

* Revert lifo for now while we investigate connection thrashing issues

* Make queue strategy configurable

* test revert idle time out

* Add pgcat start to python test
2023-06-09 11:35:20 -07:00
Zain Kabani
0bc453a771 Change default server lifetime and bump bb8 version to use LIFO correctly (#462)
Change default server lifetime and idle timeouts and bump bb8 version to use LIFO correctly
2023-05-31 08:25:42 -07:00
Zain Kabani
b67c33b6d0 Use latest bb8 and use Lifo as the queue strategy in the pool (#455)
* Use git bb8

* Use latest bb8 and change pool is use stack
2023-05-28 19:46:13 -07:00
Mostafa Abdelraouf
a8a30ad43b Refactor Pool Stats to be based off of Server/Client stats (#445)
What is wrong
Stats reported by SHOW POOLS seem to be leaking. We see lingering cl_idle , cl_waiting, and similarly for sv_idle , sv_active. We confirmed that these are reporting issues not actual lingering clients.

This behavior is readily reproducible by running

while true; do
    psql "postgres://sharding_user:sharding_user@localhost:6432/sharded_db" -c "SELECT 1" > /dev/null 2>&1  &
done

Why it happens
I wasn't able to get to figure our the reason for the bug but my best guess is that we have race conditions when updating pool-level stats. So even though individual update operations are atomic, we perform a check then update sequence which is not protected by a guard.
https://github.com/postgresml/pgcat/blob/main/src/stats/pool.rs#L174-L179

I am also suspecting that using Relaxed ordering might allow this behavior (I changed all operations to use Ordering::SeqCst but still got lingering clients)

How to fix
Since SHOW POOLS/SHOW SERVER/SHOW CLIENTS all show the current state of the proxy (as opposed to SHOW STATS which show aggregate values), this PR refactors SHOW POOLS to have it construct the results directly from SHOW SERVER and SHOW CLIENT datasets. This reduces the complexity of stat updates and eliminates the need for having locks when updating pool stats as we only care about updating individual client/server states.

This will change the semantics of maxwait, so instead of it holding the maxwait time ever encountered by a client (connected or disconnected), it will only consider connected clients which should be okay given PgCat tends to hold on to client connections more than Pgbouncer.
2023-05-23 08:44:49 -05:00
25 changed files with 2560 additions and 774 deletions

43
Cargo.lock generated
View File

@@ -83,9 +83,9 @@ checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "bb8"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582"
checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2"
dependencies = [
"async-trait",
"futures-channel",
@@ -314,12 +314,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fnv"
version = "1.0.7"
@@ -759,12 +753,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.17"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lru-cache"
@@ -914,7 +905,7 @@ dependencies = [
"chrono",
"env_logger",
"exitcode",
"fallible-iterator 0.3.0",
"fallible-iterator",
"futures",
"hmac",
"hyper",
@@ -993,22 +984,22 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.12"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.9",
]
[[package]]
@@ -1032,7 +1023,7 @@ dependencies = [
"base64",
"byteorder",
"bytes",
"fallible-iterator 0.2.0",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
@@ -1112,9 +1103,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.1"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [
"aho-corasick",
"memchr",
@@ -1123,9 +1114,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "resolv-conf"
@@ -1335,9 +1326,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.33.0"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
dependencies = [
"log",
"sqlparser_derive",

View File

@@ -8,7 +8,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
bytes = "1"
md-5 = "0.10"
bb8 = "0.8.0"
bb8 = "0.8.1"
async-trait = "0.1"
rand = "0.8"
chrono = "0.4"
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = {version = "0.33", features = ["visitor"] }
sqlparser = {version = "0.34", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
env_logger = "0.10"
@@ -38,7 +38,7 @@ socket2 = { version = "0.4.7", features = ["all"] }
nix = "0.26.2"
atomic_enum = "0.2.0"
postgres-protocol = "0.6.5"
fallible-iterator = "0.3"
fallible-iterator = "0.2"
pin-project = "1"
webpki-roots = "0.23"
rustls = { version = "0.21", features = ["dangerous_configuration"] }

View File

@@ -1,4 +1,4 @@
FROM rust:bullseye
FROM rust:1.70-bullseye
# Dependencies
RUN apt-get update -y \

View File

@@ -60,6 +60,9 @@ tcp_keepalives_count = 5
# Number of seconds between keepalive packets.
tcp_keepalives_interval = 5
# Handle prepared statements.
prepared_statements = true
# Path to TLS Certificate file to use for TLS connections
# tls_certificate = ".circleci/server.cert"
# Path to TLS private key file to use for TLS connections

View File

@@ -1,4 +1,5 @@
use crate::pool::BanReason;
use crate::stats::pool::PoolStats;
use bytes::{Buf, BufMut, BytesMut};
use log::{error, info, trace};
use nix::sys::signal::{self, Signal};
@@ -14,7 +15,7 @@ use crate::errors::Error;
use crate::messages::*;
use crate::pool::ClientServerMap;
use crate::pool::{get_all_pools, get_pool};
use crate::stats::{get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState};
use crate::stats::{get_client_stats, get_server_stats, ClientState, ServerState};
pub fn generate_server_info_for_admin() -> BytesMut {
let mut server_info = BytesMut::new();
@@ -254,39 +255,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let all_pool_stats = get_pool_stats();
let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
let pool_lookup = PoolStats::construct_pool_lookup();
let mut res = BytesMut::new();
res.put(row_description(&columns));
for ((_user_pool, _pool), pool_stats) in all_pool_stats {
let mut row = vec![
pool_stats.database(),
pool_stats.user(),
pool_stats.pool_mode().to_string(),
];
pool_stats.populate_row(&mut row);
pool_stats.clear_maxwait();
res.put(data_row(&row));
}
res.put(row_description(&PoolStats::generate_header()));
pool_lookup.iter().for_each(|(_identifier, pool_stats)| {
res.put(data_row(&pool_stats.generate_row()));
});
res.put(command_complete("SHOW"));
// ReadyForQuery
@@ -334,17 +308,17 @@ where
let paused = pool.paused();
res.put(data_row(&vec![
address.name(), // name
address.host.to_string(), // host
address.port.to_string(), // port
database_name.to_string(), // database
pool_config.user.username.to_string(), // force_user
pool_config.user.pool_size.to_string(), // pool_size
"0".to_string(), // min_pool_size
"0".to_string(), // reserve_pool
pool_config.pool_mode.to_string(), // pool_mode
pool_config.user.pool_size.to_string(), // max_connections
pool_state.connections.to_string(), // current_connections
address.name(), // name
address.host.to_string(), // host
address.port.to_string(), // port
database_name.to_string(), // database
pool_config.user.username.to_string(), // force_user
pool_config.user.pool_size.to_string(), // pool_size
pool_config.user.min_pool_size.unwrap_or(0).to_string(), // min_pool_size
"0".to_string(), // reserve_pool
pool_config.pool_mode.to_string(), // pool_mode
pool_config.user.pool_size.to_string(), // max_connections
pool_state.connections.to_string(), // current_connections
match paused {
// paused
true => "1".to_string(),
@@ -725,6 +699,8 @@ where
("bytes_sent", DataType::Numeric),
("bytes_received", DataType::Numeric),
("age_seconds", DataType::Numeric),
("prepare_cache_hit", DataType::Numeric),
("prepare_cache_miss", DataType::Numeric),
];
let new_map = get_server_stats();
@@ -748,6 +724,14 @@ where
.duration_since(server.connect_time())
.as_secs()
.to_string(),
server
.prepared_hit_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_miss_count
.load(Ordering::Relaxed)
.to_string(),
];
res.put(data_row(&row));

View File

@@ -3,8 +3,9 @@ use crate::pool::BanReason;
/// Handle clients by pretending to be a PostgreSQL server.
use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, info, trace, warn};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant;
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
@@ -13,18 +14,25 @@ use tokio::sync::mpsc::Sender;
use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::auth_passthrough::refetch_auth_hash;
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
use crate::config::{
get_config, get_idle_client_in_transaction_timeout, get_prepared_statements, Address, PoolMode,
};
use crate::constants::*;
use crate::messages::*;
use crate::plugins::PluginOutput;
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter};
use crate::server::Server;
use crate::stats::{ClientStats, PoolStats, ServerStats};
use crate::stats::{ClientStats, ServerStats};
use crate::tls::Tls;
use tokio_rustls::server::TlsStream;
/// Incrementally count prepared statements
/// to avoid random conflicts in places where the random number generator is weak.
pub static PREPARED_STATEMENT_COUNTER: Lazy<Arc<AtomicUsize>> =
Lazy::new(|| Arc::new(AtomicUsize::new(0)));
/// Type of connection received from client.
enum ClientConnectionType {
Startup,
@@ -93,6 +101,9 @@ pub struct Client<S, T> {
/// Used to notify clients about an impending shutdown
shutdown: Receiver<()>,
/// Prepared statements
prepared_statements: HashMap<String, Parse>,
}
/// Client entrypoint.
@@ -654,24 +665,12 @@ where
ready_for_query(&mut write).await?;
trace!("Startup OK");
let pool_stats = match get_pool(pool_name, username) {
Some(pool) => {
if !admin {
pool.stats
} else {
Arc::new(PoolStats::default())
}
}
None => Arc::new(PoolStats::default()),
};
let stats = Arc::new(ClientStats::new(
process_id,
application_name,
username,
pool_name,
tokio::time::Instant::now(),
pool_stats,
));
Ok(Client {
@@ -694,6 +693,7 @@ where
application_name: application_name.to_string(),
shutdown,
connected_to_server: false,
prepared_statements: HashMap::new(),
})
}
@@ -728,6 +728,7 @@ where
application_name: String::from("undefined"),
shutdown,
connected_to_server: false,
prepared_statements: HashMap::new(),
})
}
@@ -769,6 +770,10 @@ where
// Result returned by one of the plugins.
let mut plugin_output = None;
// Prepared statement being executed
let mut prepared_statement = None;
let mut will_prepare = false;
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
@@ -778,13 +783,16 @@ where
self.transaction_mode
);
// Should we rewrite prepared statements and bind messages?
let mut prepared_statements_enabled = get_prepared_statements();
// Read a complete message from the client, which normally would be
// either a `Q` (query) or `P` (prepare, extended protocol).
// We can parse it here before grabbing a server from the pool,
// in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint';
let message = tokio::select! {
let mut message = tokio::select! {
_ = self.shutdown.recv() => {
if !self.admin {
error_response_terminal(
@@ -812,7 +820,21 @@ where
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'D' | 'E' => {
'D' => {
if prepared_statements_enabled {
let name;
(name, message) = self.rewrite_describe(message).await?;
if let Some(name) = name {
prepared_statement = Some(name);
}
}
self.buffer.put(&message[..]);
continue;
}
'E' => {
self.buffer.put(&message[..]);
continue;
}
@@ -842,6 +864,11 @@ where
}
'P' => {
if prepared_statements_enabled {
(prepared_statement, message) = self.rewrite_parse(message)?;
will_prepare = true;
}
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
@@ -858,6 +885,10 @@ where
}
'B' => {
if prepared_statements_enabled {
(prepared_statement, message) = self.rewrite_bind(message).await?;
}
self.buffer.put(&message[..]);
if query_router.query_parser_enabled() {
@@ -1066,7 +1097,48 @@ where
// If the client is in session mode, no more custom protocol
// commands will be accepted.
loop {
let message = match initial_message {
// Only check if we should rewrite prepared statements
// in session mode. In transaction mode, we check at the beginning of
// each transaction.
if !self.transaction_mode {
prepared_statements_enabled = get_prepared_statements();
}
debug!("Prepared statement active: {:?}", prepared_statement);
// We are processing a prepared statement.
if let Some(ref name) = prepared_statement {
debug!("Checking prepared statement is on server");
// Get the prepared statement the server expects to see.
let statement = match self.prepared_statements.get(name) {
Some(statement) => {
debug!("Prepared statement `{}` found in cache", name);
statement
}
None => {
return Err(Error::ClientError(format!(
"prepared statement `{}` not found",
name
)))
}
};
// Since it's already in the buffer, we don't need to prepare it on this server.
if will_prepare {
server.will_prepare(&statement.name);
will_prepare = false;
} else {
// The statement is not prepared on the server, so we need to prepare it.
if server.should_prepare(&statement.name) {
server.prepare(statement).await?;
}
}
// Done processing the prepared statement.
prepared_statement = None;
}
let mut message = match initial_message {
None => {
trace!("Waiting for message inside transaction or in session mode");
@@ -1185,6 +1257,11 @@ where
// Parse
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
'P' => {
if prepared_statements_enabled {
(prepared_statement, message) = self.rewrite_parse(message)?;
will_prepare = true;
}
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
if let Ok(output) = query_router.execute_plugins(&ast).await {
@@ -1199,12 +1276,25 @@ where
// Bind
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
'B' => {
if prepared_statements_enabled {
(prepared_statement, message) = self.rewrite_bind(message).await?;
}
self.buffer.put(&message[..]);
}
// Describe
// Command a client can issue to describe a previously prepared named statement.
'D' => {
if prepared_statements_enabled {
let name;
(name, message) = self.rewrite_describe(message).await?;
if let Some(name) = name {
prepared_statement = Some(name);
}
}
self.buffer.put(&message[..]);
}
@@ -1247,7 +1337,7 @@ where
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
// Almost certainly true
if first_message_code == 'P' {
if first_message_code == 'P' && !prepared_statements_enabled {
// Message layout
// P followed by 32 int followed by null-terminated statement name
// So message code should be in offset 0 of the buffer, first character
@@ -1375,6 +1465,107 @@ where
}
}
/// Rewrite Parse (F) message to set the prepared statement name to one we control.
/// Save it into the client cache.
fn rewrite_parse(&mut self, message: BytesMut) -> Result<(Option<String>, BytesMut), Error> {
let parse: Parse = (&message).try_into()?;
let name = parse.name.clone();
// Don't rewrite anonymous prepared statements
if parse.anonymous() {
debug!("Anonymous prepared statement");
return Ok((None, message));
}
let parse = parse.rename();
debug!(
"Renamed prepared statement `{}` to `{}` and saved to cache",
name, parse.name
);
self.prepared_statements.insert(name.clone(), parse.clone());
Ok((Some(name), parse.try_into()?))
}
/// Rewrite the Bind (F) message to use the prepared statement name
/// saved in the client cache.
async fn rewrite_bind(
&mut self,
message: BytesMut,
) -> Result<(Option<String>, BytesMut), Error> {
let bind: Bind = (&message).try_into()?;
let name = bind.prepared_statement.clone();
if bind.anonymous() {
debug!("Anonymous bind message");
return Ok((None, message));
}
match self.prepared_statements.get(&name) {
Some(prepared_stmt) => {
let bind = bind.reassign(prepared_stmt);
debug!("Rewrote bind `{}` to `{}`", name, bind.prepared_statement);
Ok((Some(name), bind.try_into()?))
}
None => {
debug!("Got bind for unknown prepared statement {:?}", bind);
error_response(
&mut self.write,
&format!(
"prepared statement \"{}\" does not exist",
bind.prepared_statement
),
)
.await?;
Err(Error::ClientError(format!(
"Prepared statement `{}` doesn't exist",
name
)))
}
}
}
/// Rewrite the Describe (F) message to use the prepared statement name
/// saved in the client cache.
async fn rewrite_describe(
&mut self,
message: BytesMut,
) -> Result<(Option<String>, BytesMut), Error> {
let describe: Describe = (&message).try_into()?;
let name = describe.statement_name.clone();
if describe.anonymous() {
debug!("Anonymous describe");
return Ok((None, message));
}
match self.prepared_statements.get(&name) {
Some(prepared_stmt) => {
let describe = describe.rename(&prepared_stmt.name);
debug!(
"Rewrote describe `{}` to `{}`",
name, describe.statement_name
);
Ok((Some(name), describe.try_into()?))
}
None => {
debug!("Got describe for unknown prepared statement {:?}", describe);
Ok((None, message))
}
}
}
/// Release the server from the client: it can't cancel its queries anymore.
pub fn release(&self) {
let mut guard = self.client_server_map.lock();

View File

@@ -292,6 +292,9 @@ pub struct General {
#[serde(default = "General::default_server_lifetime")]
pub server_lifetime: u64,
#[serde(default = "General::default_server_round_robin")] // False
pub server_round_robin: bool,
#[serde(default = "General::default_worker_threads")]
pub worker_threads: usize,
@@ -317,6 +320,9 @@ pub struct General {
pub auth_query: Option<String>,
pub auth_query_user: Option<String>,
pub auth_query_password: Option<String>,
#[serde(default)]
pub prepared_statements: bool,
}
impl General {
@@ -329,7 +335,7 @@ impl General {
}
pub fn default_server_lifetime() -> u64 {
1000 * 60 * 60 * 24 // 24 hours
1000 * 60 * 60 // 1 hour
}
pub fn default_connect_timeout() -> u64 {
@@ -352,7 +358,7 @@ impl General {
}
pub fn default_idle_timeout() -> u64 {
60000 // 10 minutes
600000 // 10 minutes
}
pub fn default_shutdown_timeout() -> u64 {
@@ -390,6 +396,10 @@ impl General {
pub fn default_prometheus_exporter_port() -> i16 {
9930
}
pub fn default_server_round_robin() -> bool {
true
}
}
impl Default for General {
@@ -424,8 +434,10 @@ impl Default for General {
auth_query: None,
auth_query_user: None,
auth_query_password: None,
server_lifetime: 1000 * 3600 * 24, // 24 hours,
server_lifetime: Self::default_server_lifetime(),
server_round_robin: false,
validate_config: true,
prepared_statements: false,
}
}
}
@@ -983,6 +995,7 @@ impl Config {
"Default max server lifetime: {}ms",
self.general.server_lifetime
);
info!("Sever round robin: {}", self.general.server_round_robin);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
@@ -1006,6 +1019,7 @@ impl Config {
"Server TLS certificate verification: {}",
self.general.verify_server_certificate
);
info!("Prepared statements: {}", self.general.prepared_statements);
info!(
"Plugins: {}",
match self.plugins {
@@ -1230,6 +1244,10 @@ pub fn get_idle_client_in_transaction_timeout() -> u64 {
.idle_client_in_transaction_timeout
}
pub fn get_prepared_statements() -> bool {
(*(*CONFIG.load())).general.prepared_statements
}
/// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();

View File

@@ -26,6 +26,7 @@ pub enum Error {
AuthPassthroughError(String),
UnsupportedStatement,
QueryRouterParserError(String),
QueryRouterError(String),
}
#[derive(Clone, PartialEq, Debug)]
@@ -121,3 +122,9 @@ impl std::fmt::Display for Error {
}
}
}
impl From<std::ffi::NulError> for Error {
fn from(err: std::ffi::NulError) -> Self {
Error::QueryRouterError(err.to_string())
}
}

View File

@@ -7,11 +7,15 @@ use socket2::{SockRef, TcpKeepalive};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use crate::client::PREPARED_STATEMENT_COUNTER;
use crate::config::get_config;
use crate::errors::Error;
use std::collections::HashMap;
use std::ffi::CString;
use std::io::{BufRead, Cursor};
use std::mem;
use std::sync::atomic::Ordering;
use std::time::Duration;
/// Postgres data type mappings
@@ -526,6 +530,13 @@ pub fn command_complete(command: &str) -> BytesMut {
res
}
pub fn flush() -> BytesMut {
let mut bytes = BytesMut::new();
bytes.put_u8(b'H');
bytes.put_i32(4);
bytes
}
/// Write all data in the buffer to the TcpStream.
pub async fn write_all<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
where
@@ -689,3 +700,285 @@ impl BytesMutReader for Cursor<&BytesMut> {
}
}
}
/// Parse (F) message.
/// See: <https://www.postgresql.org/docs/current/protocol-message-formats.html>
#[derive(Clone, Debug)]
pub struct Parse {
code: char,
#[allow(dead_code)]
len: i32,
pub name: String,
pub generated_name: String,
query: String,
num_params: i16,
param_types: Vec<i32>,
}
impl TryFrom<&BytesMut> for Parse {
type Error = Error;
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let name = cursor.read_string()?;
let query = cursor.read_string()?;
let num_params = cursor.get_i16();
let mut param_types = Vec::new();
for _ in 0..num_params {
param_types.push(cursor.get_i32());
}
Ok(Parse {
code,
len,
name,
generated_name: prepared_statement_name(),
query,
num_params,
param_types,
})
}
}
impl TryFrom<Parse> for BytesMut {
type Error = Error;
fn try_from(parse: Parse) -> Result<BytesMut, Error> {
let mut bytes = BytesMut::new();
let name_binding = CString::new(parse.name)?;
let name = name_binding.as_bytes_with_nul();
let query_binding = CString::new(parse.query)?;
let query = query_binding.as_bytes_with_nul();
// Recompute length of the message.
let len = 4 // self
+ name.len()
+ query.len()
+ 2
+ 4 * parse.num_params as usize;
bytes.put_u8(parse.code as u8);
bytes.put_i32(len as i32);
bytes.put_slice(name);
bytes.put_slice(query);
bytes.put_i16(parse.num_params);
for param in parse.param_types {
bytes.put_i32(param);
}
Ok(bytes)
}
}
impl TryFrom<&Parse> for BytesMut {
type Error = Error;
fn try_from(parse: &Parse) -> Result<BytesMut, Error> {
parse.clone().try_into()
}
}
impl Parse {
pub fn rename(mut self) -> Self {
self.name = self.generated_name.to_string();
self
}
pub fn anonymous(&self) -> bool {
self.name.is_empty()
}
}
/// Bind (B) message.
/// See: <https://www.postgresql.org/docs/current/protocol-message-formats.html>
#[derive(Clone, Debug)]
pub struct Bind {
code: char,
#[allow(dead_code)]
len: i64,
portal: String,
pub prepared_statement: String,
num_param_format_codes: i16,
param_format_codes: Vec<i16>,
num_param_values: i16,
param_values: Vec<(i32, BytesMut)>,
num_result_column_format_codes: i16,
result_columns_format_codes: Vec<i16>,
}
impl TryFrom<&BytesMut> for Bind {
type Error = Error;
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
let mut cursor = Cursor::new(buf);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let portal = cursor.read_string()?;
let prepared_statement = cursor.read_string()?;
let num_param_format_codes = cursor.get_i16();
let mut param_format_codes = Vec::new();
for _ in 0..num_param_format_codes {
param_format_codes.push(cursor.get_i16());
}
let num_param_values = cursor.get_i16();
let mut param_values = Vec::new();
for _ in 0..num_param_values {
let param_len = cursor.get_i32();
let mut param = BytesMut::with_capacity(param_len as usize);
param.resize(param_len as usize, b'0');
cursor.copy_to_slice(&mut param);
param_values.push((param_len, param));
}
let num_result_column_format_codes = cursor.get_i16();
let mut result_columns_format_codes = Vec::new();
for _ in 0..num_result_column_format_codes {
result_columns_format_codes.push(cursor.get_i16());
}
Ok(Bind {
code,
len: len as i64,
portal,
prepared_statement,
num_param_format_codes,
param_format_codes,
num_param_values,
param_values,
num_result_column_format_codes,
result_columns_format_codes,
})
}
}
impl TryFrom<Bind> for BytesMut {
type Error = Error;
fn try_from(bind: Bind) -> Result<BytesMut, Error> {
let mut bytes = BytesMut::new();
let portal_binding = CString::new(bind.portal)?;
let portal = portal_binding.as_bytes_with_nul();
let prepared_statement_binding = CString::new(bind.prepared_statement)?;
let prepared_statement = prepared_statement_binding.as_bytes_with_nul();
let mut len = 4 // self
+ portal.len()
+ prepared_statement.len()
+ 2 // num_param_format_codes
+ 2 * bind.num_param_format_codes as usize // num_param_format_codes
+ 2; // num_param_values
for (param_len, _) in &bind.param_values {
len += 4 + *param_len as usize;
}
len += 2; // num_result_column_format_codes
len += 2 * bind.num_result_column_format_codes as usize;
bytes.put_u8(bind.code as u8);
bytes.put_i32(len as i32);
bytes.put_slice(portal);
bytes.put_slice(prepared_statement);
bytes.put_i16(bind.num_param_format_codes);
for param_format_code in bind.param_format_codes {
bytes.put_i16(param_format_code);
}
bytes.put_i16(bind.num_param_values);
for (param_len, param) in bind.param_values {
bytes.put_i32(param_len);
bytes.put_slice(&param);
}
bytes.put_i16(bind.num_result_column_format_codes);
for result_column_format_code in bind.result_columns_format_codes {
bytes.put_i16(result_column_format_code);
}
Ok(bytes)
}
}
impl Bind {
pub fn reassign(mut self, parse: &Parse) -> Self {
self.prepared_statement = parse.name.clone();
self
}
pub fn anonymous(&self) -> bool {
self.prepared_statement.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct Describe {
code: char,
#[allow(dead_code)]
len: i32,
target: char,
pub statement_name: String,
}
impl TryFrom<&BytesMut> for Describe {
type Error = Error;
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
let mut cursor = Cursor::new(bytes);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let target = cursor.get_u8() as char;
let statement_name = cursor.read_string()?;
Ok(Describe {
code,
len,
target,
statement_name,
})
}
}
impl TryFrom<Describe> for BytesMut {
type Error = Error;
fn try_from(describe: Describe) -> Result<BytesMut, Error> {
let mut bytes = BytesMut::new();
let statement_name_binding = CString::new(describe.statement_name)?;
let statement_name = statement_name_binding.as_bytes_with_nul();
let len = 4 + 1 + statement_name.len();
bytes.put_u8(describe.code as u8);
bytes.put_i32(len as i32);
bytes.put_u8(describe.target as u8);
bytes.put_slice(statement_name);
Ok(bytes)
}
}
impl Describe {
pub fn rename(mut self, name: &str) -> Self {
self.statement_name = name.to_string();
self
}
pub fn anonymous(&self) -> bool {
self.statement_name.is_empty()
}
}
pub fn prepared_statement_name() -> String {
format!(
"P_{}",
PREPARED_STATEMENT_COUNTER.fetch_add(1, Ordering::SeqCst)
)
}

View File

@@ -7,8 +7,7 @@ use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
use crate::config::{get_config, Address, Role, User};
use crate::pool::{ClientServerMap, PoolIdentifier, ServerPool};
use crate::stats::PoolStats;
use crate::pool::{ClientServerMap, ServerPool};
use log::{error, info, trace, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -24,7 +23,7 @@ impl MirroredClient {
async fn create_pool(&self) -> Pool<ServerPool> {
let config = get_config();
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
let (connection_timeout, idle_timeout, cfg) =
let (connection_timeout, idle_timeout, _cfg) =
match config.pools.get(&self.address.pool_name) {
Some(cfg) => (
cfg.connect_timeout.unwrap_or(default),
@@ -34,14 +33,11 @@ impl MirroredClient {
None => (default, default, crate::config::Pool::default()),
};
let identifier = PoolIdentifier::new(&self.database, &self.user.username);
let manager = ServerPool::new(
self.address.clone(),
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
Arc::new(PoolStats::new(identifier, cfg.clone())),
Arc::new(RwLock::new(None)),
None,
true,

View File

@@ -1,6 +1,6 @@
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bb8::{ManageConnection, Pool, PooledConnection};
use bb8::{ManageConnection, Pool, PooledConnection, QueueStrategy};
use bytes::{BufMut, BytesMut};
use chrono::naive::NaiveDateTime;
use log::{debug, error, info, warn};
@@ -10,6 +10,7 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use regex::Regex;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
@@ -26,7 +27,7 @@ use crate::auth_passthrough::AuthPassthrough;
use crate::plugins::prewarmer;
use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
use crate::stats::{AddressStats, ClientStats, ServerStats};
pub type ProcessId = i32;
pub type SecretKey = i32;
@@ -76,6 +77,12 @@ impl PoolIdentifier {
}
}
impl Display for PoolIdentifier {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.user, self.db)
}
}
impl From<&Address> for PoolIdentifier {
fn from(address: &Address) -> PoolIdentifier {
PoolIdentifier::new(&address.database, &address.username)
@@ -202,9 +209,6 @@ pub struct ConnectionPool {
paused: Arc<AtomicBool>,
paused_waiter: Arc<Notify>,
/// Statistics.
pub stats: Arc<PoolStats>,
/// AuthInfo
pub auth_hash: Arc<RwLock<Option<String>>>,
}
@@ -254,10 +258,6 @@ impl ConnectionPool {
.clone()
.into_keys()
.collect::<Vec<String>>();
let pool_stats = Arc::new(PoolStats::new(identifier, pool_config.clone()));
// Allow the pool to be seen in statistics
pool_stats.register(pool_stats.clone());
// Sort by shard number to ensure consistency.
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
@@ -358,7 +358,6 @@ impl ConnectionPool {
user.clone(),
&shard.database,
client_server_map.clone(),
pool_stats.clone(),
pool_auth_hash.clone(),
match pool_config.plugins {
Some(ref plugins) => Some(plugins.clone()),
@@ -390,6 +389,11 @@ impl ConnectionPool {
.min()
.unwrap();
let queue_strategy = match config.general.server_round_robin {
true => QueueStrategy::Fifo,
false => QueueStrategy::Lifo,
};
debug!(
"[pool: {}][user: {}] Pool reaper rate: {}ms",
pool_name, user.username, reaper_rate
@@ -402,6 +406,7 @@ impl ConnectionPool {
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
.reaper_rate(std::time::Duration::from_millis(reaper_rate))
.queue_strategy(queue_strategy)
.test_on_check_out(false);
let pool = if config.general.validate_config {
@@ -429,7 +434,6 @@ impl ConnectionPool {
let pool = ConnectionPool {
databases: shards,
stats: pool_stats,
addresses,
banlist: Arc::new(RwLock::new(banlist)),
config_hash: new_pool_hash_value,
@@ -610,6 +614,10 @@ impl ConnectionPool {
});
}
// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();
while !candidates.is_empty() {
// Get the next candidate
let address = match candidates.pop() {
@@ -628,10 +636,6 @@ impl ConnectionPool {
}
}
// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();
// Check if we can connect
let mut conn = match self.databases[address.shard][address.address_index]
.get()
@@ -669,7 +673,7 @@ impl ConnectionPool {
.stats()
.checkout_time(checkout_time, client_stats.application_name());
server.stats().active(client_stats.application_name());
client_stats.active();
return Ok((conn, address.clone()));
}
@@ -677,11 +681,19 @@ impl ConnectionPool {
.run_health_check(address, server, now, client_stats)
.await
{
let checkout_time: u64 = now.elapsed().as_micros() as u64;
client_stats.checkout_time(checkout_time);
server
.stats()
.checkout_time(checkout_time, client_stats.application_name());
server.stats().active(client_stats.application_name());
client_stats.active();
return Ok((conn, address.clone()));
} else {
continue;
}
}
client_stats.idle();
Err(Error::AllServersDown)
}
@@ -927,9 +939,6 @@ pub struct ServerPool {
/// Client/server mapping.
client_server_map: ClientServerMap,
/// Server statistics.
stats: Arc<PoolStats>,
/// Server auth hash (for auth passthrough).
auth_hash: Arc<RwLock<Option<String>>>,
@@ -946,7 +955,6 @@ impl ServerPool {
user: User,
database: &str,
client_server_map: ClientServerMap,
stats: Arc<PoolStats>,
auth_hash: Arc<RwLock<Option<String>>>,
plugins: Option<Plugins>,
cleanup_connections: bool,
@@ -956,7 +964,6 @@ impl ServerPool {
user: user.clone(),
database: database.to_string(),
client_server_map,
stats,
auth_hash,
plugins,
cleanup_connections,
@@ -975,7 +982,6 @@ impl ManageConnection for ServerPool {
let stats = Arc::new(ServerStats::new(
self.address.clone(),
self.stats.clone(),
tokio::time::Instant::now(),
));

View File

@@ -1,6 +1,6 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{error, info, warn};
use log::{debug, error, info};
use phf::phf_map;
use std::collections::HashMap;
use std::fmt;
@@ -9,8 +9,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::config::Address;
use crate::pool::get_all_pools;
use crate::stats::{get_pool_stats, get_server_stats, ServerStats};
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
@@ -233,10 +234,10 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("stats_{}", name), value, labels)
}
fn from_pool(pool: &(String, String), name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
fn from_pool(pool_id: PoolIdentifier, name: &str, value: u64) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new();
labels.insert("pool", pool.0.clone());
labels.insert("user", pool.1.clone());
labels.insert("pool", pool_id.db);
labels.insert("user", pool_id.user);
Self::from_name(&format!("pools_{}", name), value, labels)
}
@@ -274,7 +275,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}
@@ -284,18 +285,15 @@ fn push_address_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let pool_stats = get_pool_stats();
for (pool, stats) in pool_stats.iter() {
let stats = &**stats;
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) = PrometheusMetric::<u64>::from_pool(pool, &name, value)
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!(
"Metric {} not implemented for ({},{})",
name, pool.0, pool.1
);
debug!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
@@ -320,7 +318,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}

View File

@@ -331,7 +331,7 @@ impl QueryRouter {
Some((command, value))
}
pub fn parse(message: &BytesMut) -> Result<Vec<sqlparser::ast::Statement>, Error> {
pub fn parse(message: &BytesMut) -> Result<Vec<Statement>, Error> {
let mut message_cursor = Cursor::new(message);
let code = message_cursor.get_u8() as char;
@@ -348,12 +348,13 @@ impl QueryRouter {
// Parse (prepared statement)
'P' => {
// Reads statement name
message_cursor.read_string().unwrap();
let _name = message_cursor.read_string().unwrap();
// Reads query string
let query = message_cursor.read_string().unwrap();
debug!("Prepared statement: '{}'", query);
query
}

View File

@@ -5,7 +5,7 @@ use fallible_iterator::FallibleIterator;
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::io::Read;
use std::net::IpAddr;
use std::sync::Arc;
@@ -198,6 +198,9 @@ pub struct Server {
/// Should clean up dirty connections?
cleanup_connections: bool,
/// Prepared statements
prepared_statements: BTreeSet<String>,
}
impl Server {
@@ -692,6 +695,7 @@ impl Server {
)),
},
cleanup_connections,
prepared_statements: BTreeSet::new(),
};
server.set_name("pgcat").await?;
@@ -910,6 +914,43 @@ impl Server {
Ok(bytes)
}
pub fn will_prepare(&mut self, name: &str) {
debug!("Will prepare `{}`", name);
self.prepared_statements.insert(name.to_string());
}
pub fn should_prepare(&self, name: &str) -> bool {
let should_prepare = !self.prepared_statements.contains(name);
debug!("Should prepare `{}`: {}", name, should_prepare);
if should_prepare {
self.stats.prepared_cache_miss();
} else {
self.stats.prepared_cache_hit();
}
should_prepare
}
pub async fn prepare(&mut self, parse: &Parse) -> Result<(), Error> {
debug!("Preparing `{}`", parse.name);
let bytes: BytesMut = parse.try_into()?;
self.send(&bytes).await?;
self.send(&flush()).await?;
// Read and discard ParseComplete (B)
let _ = read_message(&mut self.stream).await?;
self.prepared_statements.insert(parse.name.to_string());
debug!("Prepared `{}`", parse.name);
Ok(())
}
/// If the server is still inside a transaction.
/// If the client disconnects while the server is in a transaction, we will clean it up.
pub fn in_transaction(&self) -> bool {

View File

@@ -1,4 +1,3 @@
use crate::pool::PoolIdentifier;
/// Statistics and reporting.
use arc_swap::ArcSwap;
@@ -16,13 +15,11 @@ pub mod pool;
pub mod server;
pub use address::AddressStats;
pub use client::{ClientState, ClientStats};
pub use pool::PoolStats;
pub use server::{ServerState, ServerStats};
/// Convenience types for various stats
type ClientStatesLookup = HashMap<i32, Arc<ClientStats>>;
type ServerStatesLookup = HashMap<i32, Arc<ServerStats>>;
type PoolStatsLookup = HashMap<(String, String), Arc<PoolStats>>;
/// Stats for individual client connections
/// Used in SHOW CLIENTS.
@@ -34,11 +31,6 @@ static CLIENT_STATS: Lazy<Arc<RwLock<ClientStatesLookup>>> =
static SERVER_STATS: Lazy<Arc<RwLock<ServerStatesLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(ServerStatesLookup::default())));
/// Aggregate stats for each pool (a pool is identified by database name and username)
/// Used in SHOW POOLS.
static POOL_STATS: Lazy<Arc<RwLock<PoolStatsLookup>>> =
Lazy::new(|| Arc::new(RwLock::new(PoolStatsLookup::default())));
/// The statistics reporter. An instance is given to each possible source of statistics,
/// e.g. client stats, server stats, connection pool stats.
pub static REPORTER: Lazy<ArcSwap<Reporter>> =
@@ -80,13 +72,6 @@ impl Reporter {
fn server_disconnecting(&self, server_id: i32) {
SERVER_STATS.write().remove(&server_id);
}
/// Register a pool with the stats system.
fn pool_register(&self, identifier: PoolIdentifier, stats: Arc<PoolStats>) {
POOL_STATS
.write()
.insert((identifier.db, identifier.user), stats);
}
}
/// The statistics collector which used for calculating averages
@@ -139,12 +124,6 @@ pub fn get_server_stats() -> ServerStatesLookup {
SERVER_STATS.read().clone()
}
/// Get a snapshot of pool statistics.
/// by the `Collector`.
pub fn get_pool_stats() -> PoolStatsLookup {
POOL_STATS.read().clone()
}
/// Get the statistics reporter used to update stats across the pools/clients.
pub fn get_reporter() -> Reporter {
(*(*REPORTER.load())).clone()

View File

@@ -1,4 +1,3 @@
use super::PoolStats;
use super::{get_reporter, Reporter};
use atomic_enum::atomic_enum;
use std::sync::atomic::*;
@@ -34,12 +33,14 @@ pub struct ClientStats {
pool_name: String,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds
pub max_wait_time: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
@@ -61,8 +62,8 @@ impl Default for ClientStats {
application_name: String::new(),
username: String::new(),
pool_name: String::new(),
pool_stats: Arc::new(PoolStats::default()),
total_wait_time: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
@@ -79,11 +80,9 @@ impl ClientStats {
username: &str,
pool_name: &str,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
) -> Self {
Self {
client_id,
pool_stats,
connect_time,
application_name: application_name.to_string(),
username: username.to_string(),
@@ -96,8 +95,6 @@ impl ClientStats {
/// update metrics on the corresponding pool.
pub fn disconnect(&self) {
self.reporter.client_disconnecting(self.client_id);
self.pool_stats
.client_disconnect(self.state.load(Ordering::Relaxed))
}
/// Register a client with the stats system. The stats system uses client_id
@@ -105,27 +102,20 @@ impl ClientStats {
pub fn register(&self, stats: Arc<ClientStats>) {
self.reporter.client_register(self.client_id, stats);
self.state.store(ClientState::Idle, Ordering::Relaxed);
self.pool_stats.cl_idle.fetch_add(1, Ordering::Relaxed);
}
/// Reports a client is done querying the server and is no longer assigned a server connection
pub fn idle(&self) {
self.pool_stats
.client_idle(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Idle, Ordering::Relaxed);
}
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
self.pool_stats
.client_waiting(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
/// Reports a client is done waiting for a connection and is about to query the server.
pub fn active(&self) {
self.pool_stats
.client_active(self.state.load(Ordering::Relaxed));
self.state.store(ClientState::Active, Ordering::Relaxed);
}
@@ -144,6 +134,8 @@ impl ClientStats {
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.max_wait_time
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server

View File

@@ -1,36 +1,131 @@
use crate::config::Pool;
use crate::config::PoolMode;
use crate::pool::PoolIdentifier;
use std::sync::atomic::*;
use std::sync::Arc;
use log::debug;
use super::get_reporter;
use super::Reporter;
use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap;
use std::sync::atomic::*;
#[derive(Debug, Clone, Default)]
use crate::pool::get_all_pools;
#[derive(Debug, Clone)]
/// A struct that holds information about a Pool .
pub struct PoolStats {
// Pool identifier, cannot be changed after creating the instance
identifier: PoolIdentifier,
pub identifier: PoolIdentifier,
pub mode: PoolMode,
pub cl_idle: u64,
pub cl_active: u64,
pub cl_waiting: u64,
pub cl_cancel_req: u64,
pub sv_active: u64,
pub sv_idle: u64,
pub sv_used: u64,
pub sv_tested: u64,
pub sv_login: u64,
pub maxwait: u64,
}
impl PoolStats {
pub fn new(identifier: PoolIdentifier, mode: PoolMode) -> Self {
PoolStats {
identifier,
mode,
cl_idle: 0,
cl_active: 0,
cl_waiting: 0,
cl_cancel_req: 0,
sv_active: 0,
sv_idle: 0,
sv_used: 0,
sv_tested: 0,
sv_login: 0,
maxwait: 0,
}
}
// Pool Config, cannot be changed after creating the instance
config: Pool,
pub fn construct_pool_lookup() -> HashMap<PoolIdentifier, PoolStats> {
let mut map: HashMap<PoolIdentifier, PoolStats> = HashMap::new();
let client_map = super::get_client_stats();
let server_map = super::get_server_stats();
// A reference to the global reporter.
reporter: Reporter,
for (identifier, pool) in get_all_pools() {
map.insert(
identifier.clone(),
PoolStats::new(identifier, pool.settings.pool_mode),
);
}
/// Counters (atomics)
pub cl_idle: Arc<AtomicU64>,
pub cl_active: Arc<AtomicU64>,
pub cl_waiting: Arc<AtomicU64>,
pub cl_cancel_req: Arc<AtomicU64>,
pub sv_active: Arc<AtomicU64>,
pub sv_idle: Arc<AtomicU64>,
pub sv_used: Arc<AtomicU64>,
pub sv_tested: Arc<AtomicU64>,
pub sv_login: Arc<AtomicU64>,
pub maxwait: Arc<AtomicU64>,
for client in client_map.values() {
match map.get_mut(&PoolIdentifier {
db: client.pool_name(),
user: client.username(),
}) {
Some(pool_stats) => {
match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => pool_stats.cl_waiting += 1,
}
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
}
None => debug!("Client from an obselete pool"),
}
}
for server in server_map.values() {
match map.get_mut(&PoolIdentifier {
db: server.pool_name(),
user: server.username(),
}) {
Some(pool_stats) => match server.state.load(Ordering::Relaxed) {
ServerState::Active => pool_stats.sv_active += 1,
ServerState::Idle => pool_stats.sv_idle += 1,
ServerState::Login => pool_stats.sv_login += 1,
ServerState::Tested => pool_stats.sv_tested += 1,
},
None => debug!("Server from an obselete pool"),
}
}
return map;
}
pub fn generate_header() -> Vec<(&'static str, DataType)> {
return vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric),
("sv_active", DataType::Numeric),
("sv_idle", DataType::Numeric),
("sv_used", DataType::Numeric),
("sv_tested", DataType::Numeric),
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
}
pub fn generate_row(&self) -> Vec<String> {
return vec![
self.identifier.db.clone(),
self.identifier.user.clone(),
self.mode.to_string(),
self.cl_idle.to_string(),
self.cl_active.to_string(),
self.cl_waiting.to_string(),
self.cl_cancel_req.to_string(),
self.sv_active.to_string(),
self.sv_idle.to_string(),
self.sv_used.to_string(),
self.sv_tested.to_string(),
self.sv_login.to_string(),
(self.maxwait / 1_000_000).to_string(),
(self.maxwait % 1_000_000).to_string(),
];
}
}
impl IntoIterator for PoolStats {
@@ -39,236 +134,18 @@ impl IntoIterator for PoolStats {
fn into_iter(self) -> Self::IntoIter {
vec![
("cl_idle".to_string(), self.cl_idle.load(Ordering::Relaxed)),
(
"cl_active".to_string(),
self.cl_active.load(Ordering::Relaxed),
),
(
"cl_waiting".to_string(),
self.cl_waiting.load(Ordering::Relaxed),
),
(
"cl_cancel_req".to_string(),
self.cl_cancel_req.load(Ordering::Relaxed),
),
(
"sv_active".to_string(),
self.sv_active.load(Ordering::Relaxed),
),
("sv_idle".to_string(), self.sv_idle.load(Ordering::Relaxed)),
("sv_used".to_string(), self.sv_used.load(Ordering::Relaxed)),
(
"sv_tested".to_string(),
self.sv_tested.load(Ordering::Relaxed),
),
(
"sv_login".to_string(),
self.sv_login.load(Ordering::Relaxed),
),
(
"maxwait".to_string(),
self.maxwait.load(Ordering::Relaxed) / 1_000_000,
),
(
"maxwait_us".to_string(),
self.maxwait.load(Ordering::Relaxed) % 1_000_000,
),
("cl_idle".to_string(), self.cl_idle),
("cl_active".to_string(), self.cl_active),
("cl_waiting".to_string(), self.cl_waiting),
("cl_cancel_req".to_string(), self.cl_cancel_req),
("sv_active".to_string(), self.sv_active),
("sv_idle".to_string(), self.sv_idle),
("sv_used".to_string(), self.sv_used),
("sv_tested".to_string(), self.sv_tested),
("sv_login".to_string(), self.sv_login),
("maxwait".to_string(), self.maxwait / 1_000_000),
("maxwait_us".to_string(), self.maxwait % 1_000_000),
]
.into_iter()
}
}
impl PoolStats {
pub fn new(identifier: PoolIdentifier, config: Pool) -> Self {
Self {
identifier,
config,
reporter: get_reporter(),
..Default::default()
}
}
// Getters
pub fn register(&self, stats: Arc<PoolStats>) {
self.reporter.pool_register(self.identifier.clone(), stats);
}
pub fn database(&self) -> String {
self.identifier.db.clone()
}
pub fn user(&self) -> String {
self.identifier.user.clone()
}
pub fn pool_mode(&self) -> PoolMode {
self.config.pool_mode
}
/// Populates an array of strings with counters (used by admin in show pools)
pub fn populate_row(&self, row: &mut Vec<String>) {
for (_key, value) in self.clone() {
row.push(value.to_string());
}
}
/// Deletes the maxwait counter, this is done everytime we obtain metrics
pub fn clear_maxwait(&self) {
self.maxwait.store(0, Ordering::Relaxed);
}
/// Notified when a server of the pool enters login state.
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_login(&self, from: ServerState) {
self.sv_login.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Login {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_active(&self, from: ServerState) {
self.sv_active.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Active {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'tested'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_tested(&self, from: ServerState) {
self.sv_tested.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Tested {
self.decrease_from_server_state(from);
}
}
/// Notified when a server of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the server that notifies.
pub fn server_idle(&self, from: ServerState) {
self.sv_idle.fetch_add(1, Ordering::Relaxed);
if from != ServerState::Idle {
self.decrease_from_server_state(from);
}
}
/// Notified when a client of the pool become 'waiting'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_waiting(&self, from: ClientState) {
if from != ClientState::Waiting {
self.cl_waiting.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'active'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_active(&self, from: ClientState) {
if from != ClientState::Active {
self.cl_active.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client of the pool become 'idle'
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_idle(&self, from: ClientState) {
if from != ClientState::Idle {
self.cl_idle.fetch_add(1, Ordering::Relaxed);
self.decrease_from_client_state(from);
}
}
/// Notified when a client disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn client_disconnect(&self, from: ClientState) {
let counter = match from {
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
ClientState::Active => &self.cl_active,
};
Self::decrease_counter(counter.clone());
}
/// Notified when a server disconnects.
///
/// Arguments:
///
/// `from`: The state of the client that notifies.
pub fn server_disconnect(&self, from: ServerState) {
let counter = match from {
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
ServerState::Tested => &self.sv_tested,
};
Self::decrease_counter(counter.clone());
}
// helpers for counter decrease
fn decrease_from_server_state(&self, from: ServerState) {
let counter = match from {
ServerState::Tested => &self.sv_tested,
ServerState::Active => &self.sv_active,
ServerState::Idle => &self.sv_idle,
ServerState::Login => &self.sv_login,
};
Self::decrease_counter(counter.clone());
}
fn decrease_from_client_state(&self, from: ClientState) {
let counter = match from {
ClientState::Active => &self.cl_active,
ClientState::Idle => &self.cl_idle,
ClientState::Waiting => &self.cl_waiting,
};
Self::decrease_counter(counter.clone());
}
fn decrease_counter(value: Arc<AtomicU64>) {
if value.load(Ordering::Relaxed) > 0 {
value.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_decrease() {
let stat: PoolStats = PoolStats::default();
stat.server_login(ServerState::Login);
stat.server_idle(ServerState::Login);
assert_eq!(stat.sv_login.load(Ordering::Relaxed), 0);
assert_eq!(stat.sv_idle.load(Ordering::Relaxed), 1);
}
}

View File

@@ -1,5 +1,4 @@
use super::AddressStats;
use super::PoolStats;
use super::{get_reporter, Reporter};
use crate::config::Address;
use atomic_enum::atomic_enum;
@@ -38,7 +37,6 @@ pub struct ServerStats {
address: Address,
connect_time: Instant,
pool_stats: Arc<PoolStats>,
reporter: Reporter,
/// Data
@@ -49,6 +47,8 @@ pub struct ServerStats {
pub transaction_count: Arc<AtomicU64>,
pub query_count: Arc<AtomicU64>,
pub error_count: Arc<AtomicU64>,
pub prepared_hit_count: Arc<AtomicU64>,
pub prepared_miss_count: Arc<AtomicU64>,
}
impl Default for ServerStats {
@@ -57,7 +57,6 @@ impl Default for ServerStats {
server_id: 0,
application_name: Arc::new(RwLock::new(String::new())),
address: Address::default(),
pool_stats: Arc::new(PoolStats::default()),
connect_time: Instant::now(),
state: Arc::new(AtomicServerState::new(ServerState::Login)),
bytes_sent: Arc::new(AtomicU64::new(0)),
@@ -66,15 +65,16 @@ impl Default for ServerStats {
query_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
reporter: get_reporter(),
prepared_hit_count: Arc::new(AtomicU64::new(0)),
prepared_miss_count: Arc::new(AtomicU64::new(0)),
}
}
}
impl ServerStats {
pub fn new(address: Address, pool_stats: Arc<PoolStats>, connect_time: Instant) -> Self {
pub fn new(address: Address, connect_time: Instant) -> Self {
Self {
address,
pool_stats,
connect_time,
server_id: rand::random::<i32>(),
..Default::default()
@@ -96,9 +96,6 @@ impl ServerStats {
/// Reports a server connection is no longer assigned to a client
/// and is available for the next client to pick it up
pub fn idle(&self) {
self.pool_stats
.server_idle(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Idle, Ordering::Relaxed);
}
@@ -106,22 +103,16 @@ impl ServerStats {
/// Also updates metrics on the pool regarding server usage.
pub fn disconnect(&self) {
self.reporter.server_disconnecting(self.server_id);
self.pool_stats
.server_disconnect(self.state.load(Ordering::Relaxed))
}
/// Reports a server connection is being tested before being given to a client.
pub fn tested(&self) {
self.set_undefined_application();
self.pool_stats
.server_tested(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Tested, Ordering::Relaxed);
}
/// Reports a server connection is attempting to login.
pub fn login(&self) {
self.pool_stats
.server_login(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Login, Ordering::Relaxed);
self.set_undefined_application();
}
@@ -129,8 +120,6 @@ impl ServerStats {
/// Reports a server connection has been assigned to a client that
/// is about to query the server
pub fn active(&self, application_name: String) {
self.pool_stats
.server_active(self.state.load(Ordering::Relaxed));
self.state.store(ServerState::Active, Ordering::Relaxed);
self.set_application(application_name);
}
@@ -152,11 +141,11 @@ impl ServerStats {
// Helper methods for show_servers
pub fn pool_name(&self) -> String {
self.pool_stats.database()
self.address.pool_name.clone()
}
pub fn username(&self) -> String {
self.pool_stats.user()
self.address.username.clone()
}
pub fn address_name(&self) -> String {
@@ -180,9 +169,6 @@ impl ServerStats {
// Update server stats and address aggregation stats
self.set_application(application_name);
self.address.stats.wait_time_add(microseconds);
self.pool_stats
.maxwait
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server
@@ -190,6 +176,7 @@ impl ServerStats {
self.set_application(application_name.to_string());
self.address.stats.query_count_add();
self.address.stats.query_time_add(milliseconds);
self.query_count.fetch_add(1, Ordering::Relaxed);
}
/// Report a transaction executed by a client a server
@@ -216,4 +203,14 @@ impl ServerStats {
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address.stats.bytes_received_add(amount_bytes as u64);
}
/// Report a prepared statement that already exists on the server.
pub fn prepared_cache_hit(&self) {
self.prepared_hit_count.fetch_add(1, Ordering::Relaxed);
}
/// Report a prepared statement that does not exist on the server yet.
pub fn prepared_cache_miss(&self) {
self.prepared_miss_count.fetch_add(1, Ordering::Relaxed);
}
}

View File

@@ -63,6 +63,7 @@ def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.
def test_normal_db_access():
pgcat_start()
conn, cur = connect_db(autocommit=False)
cur.execute("SELECT 1")
res = cur.fetchall()

View File

@@ -11,326 +11,6 @@ describe "Admin" do
processes.pgcat.shutdown
end
describe "SHOW STATS" do
context "clients connect and make one query" do
it "updates *_query_time and *_wait_time" do
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
end
sleep(1)
connections.map(&:close)
# wait for averages to be calculated, we shouldn't do this too often
sleep(15.5)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
end
end
end
describe "SHOW POOLS" do
context "bad credentials" do
it "does not change any stats" do
bad_password_url = URI(pgcat_conn_str)
bad_password_url.password = "wrong"
expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "bad database name" do
it "does not change any stats" do
bad_db_url = URI(pgcat_conn_str)
bad_db_url.path = "/wrong_db"
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "client connects but issues no queries" do
it "only affects cl_idle stats" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("20")
expect(results["sv_idle"]).to eq(before_test)
connections.map(&:close)
sleep(1.1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq(before_test)
end
end
context "clients connect and make one query" do
it "only affects cl_idle, sv_idle stats" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_active"]).to eq("5")
expect(results["sv_active"]).to eq("5")
sleep(3)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("5")
connections.map(&:close)
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client connects and opens a transaction and closes connection uncleanly" do
it "produces correct statistics" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new do
c.async_exec("BEGIN")
c.async_exec("SELECT pg_sleep(0.01)")
c.close
end
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client fail to checkout connection from the pool" do
it "counts clients as idle" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
threads = []
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
end
sleep(2)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("1")
threads.map(&:join)
connections.map(&:close)
end
end
context "clients connects and disconnect normally" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it 'shows the same number of clients before and after' do
clients_before = clients_connected_to_pool(processes: processes)
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_between = clients_connected_to_pool(processes: processes)
expect(clients_before).not_to eq(clients_between)
connections.each(&:close)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients connects and disconnect abruptly" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
it 'shows the same number of clients before and after' do
threads = []
connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_before = clients_connected_to_pool(processes: processes)
random_string = (0...8).map { (65 + rand(26)).chr }.join
connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
sleep(1)
# psql starts two processes, we only know the pid of the parent, this
# ensure both are killed
`pkill -9 -f '#{random_string}'`
Process.wait(faulty_client)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients overwhelm server pools" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "cl_waiting is updated to show it" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_waiting"]).to eq("2")
expect(results["cl_active"]).to eq("2")
expect(results["sv_active"]).to eq("2")
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("4")
expect(results["sv_idle"]).to eq("2")
threads.map(&:join)
connections.map(&:close)
end
it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
connections.map(&:close)
end
end
end
describe "SHOW CLIENTS" do
it "reports correct number and application names" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(21) # count admin clients
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
connections[0..5].map(&:close)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(15)
connections[6..].map(&:close)
sleep(1) # Wait for stats to be updated
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
admin_conn.close
end
it "reports correct number of queries and transactions" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
connections.each do |c|
c.async_exec("SELECT 1")
c.async_exec("SELECT 2")
c.async_exec("SELECT 3")
c.async_exec("BEGIN")
c.async_exec("SELECT 4")
c.async_exec("SELECT 5")
c.async_exec("COMMIT")
end
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(3)
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
expect(normal_client_results[0]["transaction_count"]).to eq("4")
expect(normal_client_results[1]["transaction_count"]).to eq("4")
expect(normal_client_results[0]["query_count"]).to eq("7")
expect(normal_client_results[1]["query_count"]).to eq("7")
admin_conn.close
connections.map(&:close)
end
end
describe "Manual Banning" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 10) }
before do
@@ -401,7 +81,7 @@ describe "Admin" do
end
end
describe "SHOW users" do
describe "SHOW USERS" do
it "returns the right users" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW USERS")[0]

369
tests/ruby/stats_spec.rb Normal file
View File

@@ -0,0 +1,369 @@
# frozen_string_literal: true
require 'open3'
require_relative 'spec_helper'
describe "Stats" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "SHOW STATS" do
context "clients connect and make one query" do
it "updates *_query_time and *_wait_time" do
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
end
sleep(1)
connections.map(&:close)
# wait for averages to be calculated, we shouldn't do this too often
sleep(15.5)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
end
end
end
describe "SHOW POOLS" do
context "bad credentials" do
it "does not change any stats" do
bad_password_url = URI(pgcat_conn_str)
bad_password_url.password = "wrong"
expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "bad database name" do
it "does not change any stats" do
bad_db_url = URI(pgcat_conn_str)
bad_db_url.path = "/wrong_db"
expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
sleep(1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("1")
end
end
context "client connects but issues no queries" do
it "only affects cl_idle stats" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
connections = Array.new(20) { PG::connect(pgcat_conn_str) }
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("20")
expect(results["sv_idle"]).to eq(before_test)
connections.map(&:close)
sleep(1.1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq(before_test)
end
end
context "clients connect and make one query" do
it "only affects cl_idle, sv_idle stats" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_active"]).to eq("5")
expect(results["sv_active"]).to eq("5")
sleep(3)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("5")
connections.map(&:close)
sleep(1)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client connects and opens a transaction and closes connection uncleanly" do
it "produces correct statistics" do
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new do
c.async_exec("BEGIN")
c.async_exec("SELECT pg_sleep(0.01)")
c.close
end
end
sleep(1.1)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["sv_idle"]).to eq("5")
end
end
context "client fail to checkout connection from the pool" do
it "counts clients as idle" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
threads = []
connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
end
sleep(2)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("5")
expect(results["sv_idle"]).to eq("1")
threads.map(&:join)
connections.map(&:close)
end
end
context "clients connects and disconnect normally" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it 'shows the same number of clients before and after' do
clients_before = clients_connected_to_pool(processes: processes)
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") rescue nil }
end
clients_between = clients_connected_to_pool(processes: processes)
expect(clients_before).not_to eq(clients_between)
connections.each(&:close)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients connects and disconnect abruptly" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
it 'shows the same number of clients before and after' do
threads = []
connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT 1") }
end
clients_before = clients_connected_to_pool(processes: processes)
random_string = (0...8).map { (65 + rand(26)).chr }.join
connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
sleep(1)
# psql starts two processes, we only know the pid of the parent, this
# ensure both are killed
`pkill -9 -f '#{random_string}'`
Process.wait(faulty_client)
clients_after = clients_connected_to_pool(processes: processes)
expect(clients_before).to eq(clients_after)
end
end
context "clients overwhelm server pools" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "cl_waiting is updated to show it" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_waiting"]).to eq("2")
expect(results["cl_active"]).to eq("2")
expect(results["sv_active"]).to eq("2")
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
expect(results["cl_idle"]).to eq("4")
expect(results["sv_idle"]).to eq("2")
threads.map(&:join)
connections.map(&:close)
end
it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
connections.map(&:close)
sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")
threads.map(&:join)
end
end
end
describe "SHOW CLIENTS" do
it "reports correct number and application names" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(21) # count admin clients
expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
connections[0..5].map(&:close)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(15)
connections[6..].map(&:close)
sleep(1) # Wait for stats to be updated
expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
admin_conn.close
end
it "reports correct number of queries and transactions" do
conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
connections.each do |c|
c.async_exec("SELECT 1")
c.async_exec("SELECT 2")
c.async_exec("SELECT 3")
c.async_exec("BEGIN")
c.async_exec("SELECT 4")
c.async_exec("SELECT 5")
c.async_exec("COMMIT")
end
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
sleep(1) # Wait for stats to be updated
results = admin_conn.async_exec("SHOW CLIENTS")
expect(results.count).to eq(3)
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
expect(normal_client_results[0]["transaction_count"]).to eq("4")
expect(normal_client_results[1]["transaction_count"]).to eq("4")
expect(normal_client_results[0]["query_count"]).to eq("7")
expect(normal_client_results[1]["query_count"]).to eq("7")
admin_conn.close
connections.map(&:close)
end
end
describe "Query Storm" do
context "when the proxy receives overwhelmingly large number of short quick queries" do
it "should not have lingering clients or active servers" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
Array.new(40) do
Thread.new do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT pg_sleep(0.1)")
rescue PG::SystemError
ensure
conn.close
end
end.each(&:join)
sleep 1
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_waiting cl_cancel_req sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
admin_conn.close
end
end
end
end

1
tests/rust/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
target/

1322
tests/rust/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

10
tests/rust/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sqlx = { version = "0.6.2", features = [ "runtime-tokio-rustls", "postgres", "json", "tls", "migrate", "time", "uuid", "ipnetwork"] }
tokio = { version = "1", features = ["full"] }

29
tests/rust/src/main.rs Normal file
View File

@@ -0,0 +1,29 @@
#[tokio::main]
async fn main() {
test_prepared_statements().await;
}
async fn test_prepared_statements() {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db")
.await
.unwrap();
let mut handles = Vec::new();
for _ in 0..5 {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}