Compare commits

...

16 Commits

Author SHA1 Message Date
Lev
7a419f40ea Revert "Require a reason when marking a server bad (#654)"
This reverts commit 4dbef49ec9.
2023-12-04 19:59:53 -08:00
Lev
54c4ad140d Revert "Not sure how this sneaked past CI"
This reverts commit 4c5498b915.
2023-12-04 19:59:42 -08:00
Lev
190e32ae85 Revert "Reset wait times when checked out successfully (#656)"
This reverts commit ec3920d60f.
2023-12-04 19:59:34 -08:00
Lev Kokotov
ec3920d60f Reset wait times when checked out successfully (#656) 2023-12-04 18:33:08 -08:00
Lev
4c5498b915 Not sure how this sneaked past CI 2023-12-04 18:30:03 -08:00
Daniel Babiak
0e8064b049 only report wait times from clients currently waiting to match behavior of pgbouncer (#655)
* Change maxwait to only report wait times from clients currently waiting to match behavior of pgbouncer

* Fix tests
2023-12-04 18:19:51 -08:00
Alec
4dbef49ec9 Require a reason when marking a server bad (#654)
When calling mark_bad require a reason so it can be logged rather than
the generic message
2023-12-04 16:09:41 -08:00
Lev Kokotov
bc07dc9c81 Broken blog link 2 2023-12-03 21:01:23 -08:00
Lev Kokotov
9b8166b313 Broken blog link (#652)
Update README.md
2023-12-03 20:58:39 -08:00
Lev Kokotov
e58d69f3de Fix deb build overwriting config (#651) 2023-12-03 20:27:44 -08:00
Lev Kokotov
e76d720ffb Dont cache prepared statement with errors (#647)
* Fix prepared statement not found when prepared stmt has error

* cleanup debug

* remove more debug msgs

* sure debugged this..

* version bump

* add rust tests
2023-11-28 21:13:30 -08:00
Calvin Hughes
998cc16a3c Expose clients maxwait time in SHOW CLIENTS response via admin (#639)
* Expose clients maxwait time in SHOW CLIENTS response via PgCat admin
Displays the maxwait via maxwait_seconds and maxwait_us columns for each client that can be used to track down the wait time per client in a case where the overall pool stats shows waiting time. The maxwait_us, similar to the pool stats setup, is configured to display as a remainder alongside the maxwait_seconds.

* Use maxwait instead of maxwait_seconds to match pools column name

---------

Co-authored-by: Calvin Hughes <9379992+calvinhughes@users.noreply.github.com>
2023-11-13 11:24:39 -08:00
Jakob Schultz-Falk
7c37da2fad Support unnamed prepared statements (#635)
* Add golang test suite to reproduce issue with unnamed parameterized prepared statements

* Allow caching of unnamed prepared statements

* Passthrough describe on portals

* Remove unneeded kill

* Update Dockerfile.ci with golang

* Move out update of Dockerfiles to separate PR
2023-11-08 16:36:45 -08:00
Jakob Schultz-Falk
b45c6b1d23 Update Dockerfile.ci with golang (#637) 2023-11-08 08:25:49 -08:00
Lev Kokotov
dae240d30c Add connet_timeout and idle_timeout to the user (#634)
* Add connect_timeout to the user

* Allow user to override connect timeout

* version

* lock

* Add both timeouts to the user
2023-11-06 12:18:52 -08:00
Lev Kokotov
b52ea8e7f1 bump version (#629) 2023-10-26 10:50:45 -07:00
26 changed files with 534 additions and 53 deletions

View File

@@ -108,8 +108,24 @@ cd ../..
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1
#
# Go tests
# Starts its own pgcat server
#
pushd tests/go
/usr/local/go/bin/go test || exit 1
popd
start_pgcat "info"
#
# Rust tests
#
cd tests/rust
cargo run
cd ../../
# Admin tests
export PGPASSWORD=admin_pass
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
packageVersion:
default: "1.1.2-dev"
default: "1.1.2-dev1"
jobs:
build:
strategy:

2
Cargo.lock generated
View File

@@ -1020,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.2-dev"
version = "1.1.2-dev4"
dependencies = [
"arc-swap",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev"
version = "1.1.2-dev4"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -9,6 +9,9 @@ RUN sudo apt-get update && \
sudo apt-get upgrade curl && \
cargo install cargo-binutils rustfilt && \
rustup component add llvm-tools-preview && \
pip3 install psycopg2 && sudo gem install bundler && \
pip3 install psycopg2 && sudo gem install bundler && \
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -40,7 +40,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<img src="./images/postgresml.webp" height="70" width="auto">
</a>
</td>
@@ -57,7 +57,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
PostgresML
</a>
</td>

View File

@@ -301,6 +301,8 @@ username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000
connect_timeout = 1000
idle_timeout = 1000
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
# Each shard config contains a list of servers that make up the shard

View File

@@ -690,6 +690,8 @@ where
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
("age_seconds", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
let new_map = get_client_stats();
@@ -697,6 +699,7 @@ where
res.put(row_description(&columns));
for (_, client) in new_map {
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
let row = vec![
format!("{:#010X}", client.client_id()),
client.pool_name(),
@@ -710,6 +713,8 @@ where
.duration_since(client.connect_time())
.as_secs()
.to_string(),
(max_wait / 1_000_000).to_string(),
(max_wait % 1_000_000).to_string(),
];
res.put(data_row(&row));

View File

@@ -79,6 +79,8 @@ impl AuthPassthrough {
pool_mode: None,
server_lifetime: None,
min_pool_size: None,
connect_timeout: None,
idle_timeout: None,
};
let user = &address.username;

View File

@@ -1149,7 +1149,7 @@ where
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = *message.first().unwrap() as char;
trace!("Message: {}", code);
trace!("Client message: {}", code);
match code {
// Query
@@ -1188,6 +1188,7 @@ where
};
}
}
debug!("Sending query to server");
self.send_and_receive_loop(
@@ -1320,6 +1321,7 @@ where
{
match protocol_data {
ExtendedProtocolData::Parse { data, metadata } => {
debug!("Have parse in extended buffer");
let (parse, hash) = match metadata {
Some(metadata) => metadata,
None => {
@@ -1656,11 +1658,25 @@ where
) -> Result<(), Error> {
match self.prepared_statements.get(&client_name) {
Some((parse, hash)) => {
debug!("Prepared statement `{}` found in cache", parse.name);
debug!("Prepared statement `{}` found in cache", client_name);
// In this case we want to send the parse message to the server
// since pgcat is initiating the prepared statement on this specific server
self.register_parse_to_server_cache(true, hash, parse, pool, server, address)
.await?;
match self
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
.await
{
Ok(_) => (),
Err(err) => match err {
Error::PreparedStatementError => {
debug!("Removed {} from client cache", client_name);
self.prepared_statements.remove(&client_name);
}
_ => {
return Err(err);
}
},
}
}
None => {
@@ -1689,11 +1705,20 @@ where
// We want to promote this in the pool's LRU
pool.promote_prepared_statement_hash(hash);
debug!("Checking for prepared statement {}", parse.name);
if let Err(err) = server
.register_prepared_statement(parse, should_send_parse_to_server)
.await
{
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
match err {
// Don't ban for this.
Error::PreparedStatementError => (),
_ => {
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
}
};
return Err(err);
}
@@ -1704,18 +1729,14 @@ where
/// and also the pool's statement cache. Add it to extended protocol data.
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
let client_given_name = match self.prepared_statements_enabled {
true => Parse::get_name(&message)?,
false => "".to_string(),
};
if client_given_name.is_empty() {
if !self.prepared_statements_enabled {
debug!("Anonymous parse message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_parse(message, None));
return Ok(());
}
let client_given_name = Parse::get_name(&message)?;
let parse: Parse = (&message).try_into()?;
// Compute the hash of the parse statement
@@ -1753,18 +1774,15 @@ where
/// saved in the client cache.
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
let client_given_name = match self.prepared_statements_enabled {
true => Bind::get_name(&message)?,
false => "".to_string(),
};
if client_given_name.is_empty() {
if !self.prepared_statements_enabled {
debug!("Anonymous bind message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_bind(message, None));
return Ok(());
}
let client_given_name = Bind::get_name(&message)?;
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let message = Bind::rename(message, &rewritten_parse.name)?;
@@ -1807,12 +1825,7 @@ where
/// saved in the client cache.
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
let describe: Describe = match self.prepared_statements_enabled {
true => (&message).try_into()?,
false => Describe::empty_new(),
};
if describe.anonymous() {
if !self.prepared_statements_enabled {
debug!("Anonymous describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
@@ -1820,6 +1833,15 @@ where
return Ok(());
}
let describe: Describe = (&message).try_into()?;
if describe.target == 'P' {
debug!("Portal describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
return Ok(());
}
let client_given_name = describe.statement_name.clone();
match self.prepared_statements.get(&client_given_name) {

View File

@@ -216,6 +216,8 @@ pub struct User {
pub server_lifetime: Option<u64>,
#[serde(default)] // 0
pub statement_timeout: u64,
pub connect_timeout: Option<u64>,
pub idle_timeout: Option<u64>,
}
impl Default for User {
@@ -230,6 +232,8 @@ impl Default for User {
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
connect_timeout: None,
idle_timeout: None,
}
}
}
@@ -1307,6 +1311,24 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}][user: {}] Connection timeout: {}",
pool_name,
user.1.username,
match user.1.connect_timeout {
Some(connect_timeout) => format!("{}ms", connect_timeout),
None => "not set".to_string(),
}
);
info!(
"[pool: {}][user: {}] Idle timeout: {}",
pool_name,
user.1.username,
match user.1.idle_timeout {
Some(idle_timeout) => format!("{}ms", idle_timeout),
None => "not set".to_string(),
}
);
}
}
}

View File

@@ -29,6 +29,7 @@ pub enum Error {
QueryRouterParserError(String),
QueryRouterError(String),
InvalidShardId(usize),
PreparedStatementError,
}
#[derive(Clone, PartialEq, Debug)]

View File

@@ -1109,7 +1109,7 @@ pub struct Describe {
#[allow(dead_code)]
len: i32,
target: char,
pub target: char,
pub statement_name: String,
}

View File

@@ -436,14 +436,20 @@ impl ConnectionPool {
pool_config.prepared_statements_cache_size,
);
let connect_timeout = match pool_config.connect_timeout {
let connect_timeout = match user.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => config.general.connect_timeout,
None => match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => config.general.connect_timeout,
},
};
let idle_timeout = match pool_config.idle_timeout {
let idle_timeout = match user.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => config.general.idle_timeout,
None => match pool_config.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => config.general.idle_timeout,
},
};
let server_lifetime = match user.server_lifetime {

View File

@@ -7,7 +7,7 @@ use lru::LruCache;
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::net::IpAddr;
use std::num::NonZeroUsize;
@@ -325,6 +325,9 @@ pub struct Server {
/// Prepared statements
prepared_statement_cache: Option<LruCache<String, ()>>,
/// Prepared statement being currently registered on the server.
registering_prepared_statement: VecDeque<String>,
}
impl Server {
@@ -827,6 +830,7 @@ impl Server {
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
)),
},
registering_prepared_statement: VecDeque::new(),
};
return Ok(server);
@@ -956,7 +960,6 @@ impl Server {
// There is no more data available from the server.
self.data_available = false;
break;
}
@@ -966,6 +969,23 @@ impl Server {
self.in_copy_mode = false;
}
// Remove the prepared statement from the cache, it has a syntax error or something else bad happened.
if let Some(prepared_stmt_name) =
self.registering_prepared_statement.pop_front()
{
if let Some(ref mut cache) = self.prepared_statement_cache {
if let Some(_removed) = cache.pop(&prepared_stmt_name) {
debug!(
"Removed {} from prepared statement cache",
prepared_stmt_name
);
} else {
// Shouldn't happen.
debug!("Prepared statement {} was not cached", prepared_stmt_name);
}
}
}
if self.prepared_statement_cache.is_some() {
let error_message = PgErrorMsg::parse(&message)?;
if error_message.message == "cached plan must not change result type" {
@@ -1068,6 +1088,11 @@ impl Server {
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
'c' => (),
// Parse complete successfully
'1' => {
self.registering_prepared_statement.pop_front();
}
// Anything else, e.g. errors, notices, etc.
// Keep buffering until ReadyForQuery shows up.
_ => (),
@@ -1107,7 +1132,7 @@ impl Server {
has_it
}
pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return None,
@@ -1129,7 +1154,7 @@ impl Server {
None
}
pub fn remove_prepared_statement_from_cache(&mut self, name: &str) {
fn remove_prepared_statement_from_cache(&mut self, name: &str) {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return,
@@ -1145,6 +1170,9 @@ impl Server {
should_send_parse_to_server: bool,
) -> Result<(), Error> {
if !self.has_prepared_statement(&parse.name) {
self.registering_prepared_statement
.push_back(parse.name.clone());
let mut bytes = BytesMut::new();
if should_send_parse_to_server {
@@ -1176,7 +1204,13 @@ impl Server {
}
};
Ok(())
// If it's not there, something went bad, I'm guessing bad syntax or permissions error
// on the server.
if !self.has_prepared_statement(&parse.name) {
Err(Error::PreparedStatementError)
} else {
Ok(())
}
}
/// If the server is still inside a transaction.
@@ -1186,6 +1220,7 @@ impl Server {
self.in_transaction
}
/// Currently copying data from client to server or vice-versa.
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode
}

View File

@@ -38,8 +38,10 @@ pub struct ClientStats {
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds
pub max_wait_time: Arc<AtomicU64>,
/// When this client started waiting.
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
/// of us using an "AtomicInstant"
pub wait_start: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
@@ -63,7 +65,7 @@ impl Default for ClientStats {
username: String::new(),
pool_name: String::new(),
total_wait_time: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)),
wait_start: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
@@ -111,6 +113,11 @@ impl ClientStats {
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
// safe to truncate, we only lose info if duration is greater than ~585,000 years
self.wait_start.store(
Instant::now().duration_since(self.connect_time).as_micros() as u64,
Ordering::Relaxed,
);
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
@@ -134,8 +141,6 @@ impl ClientStats {
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.max_wait_time
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server

View File

@@ -4,6 +4,7 @@ use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap;
use std::sync::atomic::*;
use tokio::time::Instant;
use crate::pool::get_all_pools;
@@ -53,6 +54,7 @@ impl PoolStats {
);
}
let now = Instant::now();
for client in client_map.values() {
match map.get_mut(&PoolIdentifier {
db: client.pool_name(),
@@ -62,10 +64,16 @@ impl PoolStats {
match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => pool_stats.cl_waiting += 1,
ClientState::Waiting => {
pool_stats.cl_waiting += 1;
// wait_start is measured as microseconds since connect_time
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
let duration_since_connect = now.duration_since(client.connect_time());
let wait_time = (duration_since_connect.as_micros() as u64)
- client.wait_start.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
}
}
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
}
None => debug!("Client from an obselete pool"),
}

View File

@@ -8,3 +8,6 @@ RUN rustup component add llvm-tools-preview
RUN sudo gem install bundler
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i toxiproxy-2.4.0.deb
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz

5
tests/go/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module pgcat
go 1.21
require github.com/lib/pq v1.10.9

2
tests/go/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

162
tests/go/pgcat.toml Normal file
View File

@@ -0,0 +1,162 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = "${PORT}"
# Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 1000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# If we should log client connections
log_client_connections = false
# If we should log client disconnections
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = 15000
server_round_robin = false
# TLS
tls_certificate = "../../.circleci/server.cert"
tls_private_key = "../../.circleci/server.key"
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username = "admin_user"
admin_password = "admin_pass"
# pool
# configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
[pools.sharded_db]
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"
# Prepared statements cache size.
prepared_statements_cache_size = 500
# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
# Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 5
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000
# Shard 0
[pools.sharded_db.shards.0]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
# Database name (e.g. "postgres")
database = "shard0"
[pools.sharded_db.shards.1]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard1"
[pools.sharded_db.shards.2]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard2"
[pools.simple_db]
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
[pools.simple_db.users.0]
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
database = "some_db"

52
tests/go/prepared_test.go Normal file
View File

@@ -0,0 +1,52 @@
package pgcat
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq"
"testing"
)
func Test(t *testing.T) {
t.Cleanup(setup(t))
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
}
func namedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
stmt, err := db.Prepare("SELECT $1")
if err != nil {
t.Fatalf("could not prepare: %+v", err)
}
for i := 0; i < 100; i++ {
rows, err := stmt.Query(1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}
func unnamedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
for i := 0; i < 100; i++ {
// Under the hood QueryContext generates an unnamed parameterized prepared statement
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}

81
tests/go/setup.go Normal file
View File

@@ -0,0 +1,81 @@
package pgcat
import (
"context"
"database/sql"
_ "embed"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
"testing"
"time"
)
//go:embed pgcat.toml
var pgcatCfg string
var port = rand.Intn(32760-20000) + 20000
func setup(t *testing.T) func() {
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
if err != nil {
t.Fatalf("could not create temp file: %+v", err)
}
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
_, err = cfg.Write([]byte(pgcatCfg))
if err != nil {
t.Fatalf("could not write temp file: %+v", err)
}
commandPath := "../../target/debug/pgcat"
if os.Getenv("CARGO_TARGET_DIR") != "" {
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
}
cmd := exec.Command(commandPath, cfg.Name())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err = cmd.Run()
if err != nil {
t.Errorf("could not run pgcat: %+v", err)
}
}()
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancelFunc()
for {
select {
case <-deadline.Done():
break
case <-time.After(50 * time.Millisecond):
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
if err != nil {
continue
}
rows, err := db.QueryContext(deadline, "SHOW STATS")
if err != nil {
continue
}
_ = rows.Close()
_ = db.Close()
break
}
break
}
return func() {
err := cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatalf("could not interrupt pgcat: %+v", err)
}
err = os.Remove(cfg.Name())
if err != nil {
t.Fatalf("could not remove temp file: %+v", err)
}
}
}

View File

@@ -233,7 +233,7 @@ describe "Stats" do
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
@@ -260,12 +260,20 @@ describe "Stats" do
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
# two connections waiting => they report wait time
sleep(1.1) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
sleep(2.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
# no connections waiting => no reported wait time
expect(results["maxwait"]).to eq("0")
expect(results["maxwait_us"]).to eq("0")
connections.map(&:close)
sleep(4.5) # Allow time for stats to update
@@ -329,6 +337,40 @@ describe "Stats" do
admin_conn.close
connections.map(&:close)
end
context "when client has waited for a server" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "shows correct maxwait" do
threads = []
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW CLIENTS")
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
expect(non_waiting_clients.count).to eq(2)
non_waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
end
expect(waiting_clients.count).to eq(1)
waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
end
admin_conn.close
connections.map(&:close)
end
end
end

View File

@@ -16,7 +16,14 @@ async fn test_prepared_statements() {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
}
}
}
});

View File

@@ -22,7 +22,7 @@ mkdir -p "$deb_dir/etc/systemd/system"
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
chmod +x "$deb_dir/usr/bin/pgcat"
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
(cat control | envsubst) > "$deb_dir/DEBIAN/control"