Compare commits

..

1 Commits

Author SHA1 Message Date
Lev Kokotov
9b41cc2639 bump version 2023-10-26 10:50:06 -07:00
25 changed files with 52 additions and 533 deletions

View File

@@ -108,24 +108,8 @@ cd ../..
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py || exit 1
#
# Go tests
# Starts its own pgcat server
#
pushd tests/go
/usr/local/go/bin/go test || exit 1
popd
start_pgcat "info"
#
# Rust tests
#
cd tests/rust
cargo run
cd ../../
# Admin tests
export PGPASSWORD=admin_pass
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null

2
Cargo.lock generated
View File

@@ -1020,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.1.2-dev1"
dependencies = [
"arc-swap",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.1.2-dev1"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -9,9 +9,6 @@ RUN sudo apt-get update && \
sudo apt-get upgrade curl && \
cargo install cargo-binutils rustfilt && \
rustup component add llvm-tools-preview && \
pip3 install psycopg2 && sudo gem install bundler && \
pip3 install psycopg2 && sudo gem install bundler && \
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -40,7 +40,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
<img src="./images/postgresml.webp" height="70" width="auto">
</a>
</td>
@@ -57,7 +57,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
</a>
</td>
<td>
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
PostgresML
</a>
</td>

View File

@@ -301,8 +301,6 @@ username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000
connect_timeout = 1000
idle_timeout = 1000
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
# Each shard config contains a list of servers that make up the shard

View File

@@ -690,8 +690,6 @@ where
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
("age_seconds", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
let new_map = get_client_stats();
@@ -699,7 +697,6 @@ where
res.put(row_description(&columns));
for (_, client) in new_map {
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
let row = vec![
format!("{:#010X}", client.client_id()),
client.pool_name(),
@@ -713,8 +710,6 @@ where
.duration_since(client.connect_time())
.as_secs()
.to_string(),
(max_wait / 1_000_000).to_string(),
(max_wait % 1_000_000).to_string(),
];
res.put(data_row(&row));

View File

@@ -79,8 +79,6 @@ impl AuthPassthrough {
pool_mode: None,
server_lifetime: None,
min_pool_size: None,
connect_timeout: None,
idle_timeout: None,
};
let user = &address.username;

View File

@@ -1149,7 +1149,7 @@ where
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = *message.first().unwrap() as char;
trace!("Client message: {}", code);
trace!("Message: {}", code);
match code {
// Query
@@ -1188,7 +1188,6 @@ where
};
}
}
debug!("Sending query to server");
self.send_and_receive_loop(
@@ -1321,7 +1320,6 @@ where
{
match protocol_data {
ExtendedProtocolData::Parse { data, metadata } => {
debug!("Have parse in extended buffer");
let (parse, hash) = match metadata {
Some(metadata) => metadata,
None => {
@@ -1658,25 +1656,11 @@ where
) -> Result<(), Error> {
match self.prepared_statements.get(&client_name) {
Some((parse, hash)) => {
debug!("Prepared statement `{}` found in cache", client_name);
debug!("Prepared statement `{}` found in cache", parse.name);
// In this case we want to send the parse message to the server
// since pgcat is initiating the prepared statement on this specific server
match self
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
.await
{
Ok(_) => (),
Err(err) => match err {
Error::PreparedStatementError => {
debug!("Removed {} from client cache", client_name);
self.prepared_statements.remove(&client_name);
}
_ => {
return Err(err);
}
},
}
self.register_parse_to_server_cache(true, hash, parse, pool, server, address)
.await?;
}
None => {
@@ -1705,20 +1689,11 @@ where
// We want to promote this in the pool's LRU
pool.promote_prepared_statement_hash(hash);
debug!("Checking for prepared statement {}", parse.name);
if let Err(err) = server
.register_prepared_statement(parse, should_send_parse_to_server)
.await
{
match err {
// Don't ban for this.
Error::PreparedStatementError => (),
_ => {
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
}
};
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
return Err(err);
}
@@ -1729,14 +1704,18 @@ where
/// and also the pool's statement cache. Add it to extended protocol data.
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
if !self.prepared_statements_enabled {
let client_given_name = match self.prepared_statements_enabled {
true => Parse::get_name(&message)?,
false => "".to_string(),
};
if client_given_name.is_empty() {
debug!("Anonymous parse message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_parse(message, None));
return Ok(());
}
let client_given_name = Parse::get_name(&message)?;
let parse: Parse = (&message).try_into()?;
// Compute the hash of the parse statement
@@ -1774,15 +1753,18 @@ where
/// saved in the client cache.
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
if !self.prepared_statements_enabled {
let client_given_name = match self.prepared_statements_enabled {
true => Bind::get_name(&message)?,
false => "".to_string(),
};
if client_given_name.is_empty() {
debug!("Anonymous bind message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_bind(message, None));
return Ok(());
}
let client_given_name = Bind::get_name(&message)?;
match self.prepared_statements.get(&client_given_name) {
Some((rewritten_parse, _)) => {
let message = Bind::rename(message, &rewritten_parse.name)?;
@@ -1825,7 +1807,12 @@ where
/// saved in the client cache.
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
// Avoid parsing if prepared statements not enabled
if !self.prepared_statements_enabled {
let describe: Describe = match self.prepared_statements_enabled {
true => (&message).try_into()?,
false => Describe::empty_new(),
};
if describe.anonymous() {
debug!("Anonymous describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
@@ -1833,15 +1820,6 @@ where
return Ok(());
}
let describe: Describe = (&message).try_into()?;
if describe.target == 'P' {
debug!("Portal describe message");
self.extended_protocol_data_buffer
.push_back(ExtendedProtocolData::create_new_describe(message, None));
return Ok(());
}
let client_given_name = describe.statement_name.clone();
match self.prepared_statements.get(&client_given_name) {

View File

@@ -216,8 +216,6 @@ pub struct User {
pub server_lifetime: Option<u64>,
#[serde(default)] // 0
pub statement_timeout: u64,
pub connect_timeout: Option<u64>,
pub idle_timeout: Option<u64>,
}
impl Default for User {
@@ -232,8 +230,6 @@ impl Default for User {
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
connect_timeout: None,
idle_timeout: None,
}
}
}
@@ -1311,24 +1307,6 @@ impl Config {
None => "default".to_string(),
}
);
info!(
"[pool: {}][user: {}] Connection timeout: {}",
pool_name,
user.1.username,
match user.1.connect_timeout {
Some(connect_timeout) => format!("{}ms", connect_timeout),
None => "not set".to_string(),
}
);
info!(
"[pool: {}][user: {}] Idle timeout: {}",
pool_name,
user.1.username,
match user.1.idle_timeout {
Some(idle_timeout) => format!("{}ms", idle_timeout),
None => "not set".to_string(),
}
);
}
}
}

View File

@@ -29,7 +29,6 @@ pub enum Error {
QueryRouterParserError(String),
QueryRouterError(String),
InvalidShardId(usize),
PreparedStatementError,
}
#[derive(Clone, PartialEq, Debug)]

View File

@@ -1109,7 +1109,7 @@ pub struct Describe {
#[allow(dead_code)]
len: i32,
pub target: char,
target: char,
pub statement_name: String,
}

View File

@@ -436,20 +436,14 @@ impl ConnectionPool {
pool_config.prepared_statements_cache_size,
);
let connect_timeout = match user.connect_timeout {
let connect_timeout = match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => config.general.connect_timeout,
},
None => config.general.connect_timeout,
};
let idle_timeout = match user.idle_timeout {
let idle_timeout = match pool_config.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => match pool_config.idle_timeout {
Some(idle_timeout) => idle_timeout,
None => config.general.idle_timeout,
},
None => config.general.idle_timeout,
};
let server_lifetime = match user.server_lifetime {

View File

@@ -7,7 +7,7 @@ use lru::LruCache;
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::net::IpAddr;
use std::num::NonZeroUsize;
@@ -325,9 +325,6 @@ pub struct Server {
/// Prepared statements
prepared_statement_cache: Option<LruCache<String, ()>>,
/// Prepared statement being currently registered on the server.
registering_prepared_statement: VecDeque<String>,
}
impl Server {
@@ -830,7 +827,6 @@ impl Server {
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
)),
},
registering_prepared_statement: VecDeque::new(),
};
return Ok(server);
@@ -960,6 +956,7 @@ impl Server {
// There is no more data available from the server.
self.data_available = false;
break;
}
@@ -969,23 +966,6 @@ impl Server {
self.in_copy_mode = false;
}
// Remove the prepared statement from the cache, it has a syntax error or something else bad happened.
if let Some(prepared_stmt_name) =
self.registering_prepared_statement.pop_front()
{
if let Some(ref mut cache) = self.prepared_statement_cache {
if let Some(_removed) = cache.pop(&prepared_stmt_name) {
debug!(
"Removed {} from prepared statement cache",
prepared_stmt_name
);
} else {
// Shouldn't happen.
debug!("Prepared statement {} was not cached", prepared_stmt_name);
}
}
}
if self.prepared_statement_cache.is_some() {
let error_message = PgErrorMsg::parse(&message)?;
if error_message.message == "cached plan must not change result type" {
@@ -1088,11 +1068,6 @@ impl Server {
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
'c' => (),
// Parse complete successfully
'1' => {
self.registering_prepared_statement.pop_front();
}
// Anything else, e.g. errors, notices, etc.
// Keep buffering until ReadyForQuery shows up.
_ => (),
@@ -1132,7 +1107,7 @@ impl Server {
has_it
}
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return None,
@@ -1154,7 +1129,7 @@ impl Server {
None
}
fn remove_prepared_statement_from_cache(&mut self, name: &str) {
pub fn remove_prepared_statement_from_cache(&mut self, name: &str) {
let cache = match &mut self.prepared_statement_cache {
Some(cache) => cache,
None => return,
@@ -1170,9 +1145,6 @@ impl Server {
should_send_parse_to_server: bool,
) -> Result<(), Error> {
if !self.has_prepared_statement(&parse.name) {
self.registering_prepared_statement
.push_back(parse.name.clone());
let mut bytes = BytesMut::new();
if should_send_parse_to_server {
@@ -1204,13 +1176,7 @@ impl Server {
}
};
// If it's not there, something went bad, I'm guessing bad syntax or permissions error
// on the server.
if !self.has_prepared_statement(&parse.name) {
Err(Error::PreparedStatementError)
} else {
Ok(())
}
Ok(())
}
/// If the server is still inside a transaction.
@@ -1220,7 +1186,6 @@ impl Server {
self.in_transaction
}
/// Currently copying data from client to server or vice-versa.
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode
}

View File

@@ -38,10 +38,8 @@ pub struct ClientStats {
/// Total time spent waiting for a connection from pool, measures in microseconds
pub total_wait_time: Arc<AtomicU64>,
/// When this client started waiting.
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
/// of us using an "AtomicInstant"
pub wait_start: Arc<AtomicU64>,
/// Maximum time spent waiting for a connection from pool, measures in microseconds
pub max_wait_time: Arc<AtomicU64>,
/// Current state of the client
pub state: Arc<AtomicClientState>,
@@ -65,7 +63,7 @@ impl Default for ClientStats {
username: String::new(),
pool_name: String::new(),
total_wait_time: Arc::new(AtomicU64::new(0)),
wait_start: Arc::new(AtomicU64::new(0)),
max_wait_time: Arc::new(AtomicU64::new(0)),
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
transaction_count: Arc::new(AtomicU64::new(0)),
query_count: Arc::new(AtomicU64::new(0)),
@@ -113,11 +111,6 @@ impl ClientStats {
/// Reports a client is waiting for a connection
pub fn waiting(&self) {
// safe to truncate, we only lose info if duration is greater than ~585,000 years
self.wait_start.store(
Instant::now().duration_since(self.connect_time).as_micros() as u64,
Ordering::Relaxed,
);
self.state.store(ClientState::Waiting, Ordering::Relaxed);
}
@@ -141,6 +134,8 @@ impl ClientStats {
pub fn checkout_time(&self, microseconds: u64) {
self.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.max_wait_time
.fetch_max(microseconds, Ordering::Relaxed);
}
/// Report a query executed by a client against a server

View File

@@ -4,7 +4,6 @@ use super::{ClientState, ServerState};
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
use std::collections::HashMap;
use std::sync::atomic::*;
use tokio::time::Instant;
use crate::pool::get_all_pools;
@@ -54,7 +53,6 @@ impl PoolStats {
);
}
let now = Instant::now();
for client in client_map.values() {
match map.get_mut(&PoolIdentifier {
db: client.pool_name(),
@@ -64,16 +62,10 @@ impl PoolStats {
match client.state.load(Ordering::Relaxed) {
ClientState::Active => pool_stats.cl_active += 1,
ClientState::Idle => pool_stats.cl_idle += 1,
ClientState::Waiting => {
pool_stats.cl_waiting += 1;
// wait_start is measured as microseconds since connect_time
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
let duration_since_connect = now.duration_since(client.connect_time());
let wait_time = (duration_since_connect.as_micros() as u64)
- client.wait_start.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
}
ClientState::Waiting => pool_stats.cl_waiting += 1,
}
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
}
None => debug!("Client from an obselete pool"),
}

View File

@@ -8,6 +8,3 @@ RUN rustup component add llvm-tools-preview
RUN sudo gem install bundler
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
sudo dpkg -i toxiproxy-2.4.0.deb
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz

View File

@@ -1,5 +0,0 @@
module pgcat
go 1.21
require github.com/lib/pq v1.10.9

View File

@@ -1,2 +0,0 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

View File

@@ -1,162 +0,0 @@
#
# PgCat config example.
#
#
# General pooler settings
[general]
# What IP to run on, 0.0.0.0 means accessible from everywhere.
host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example.
port = "${PORT}"
# Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms).
connect_timeout = 1000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000
# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000
# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds
# If we should log client connections
log_client_connections = false
# If we should log client disconnections
log_client_disconnections = false
# Reload config automatically if it changes.
autoreload = 15000
server_round_robin = false
# TLS
tls_certificate = "../../.circleci/server.cert"
tls_private_key = "../../.circleci/server.key"
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
admin_username = "admin_user"
admin_password = "admin_pass"
# pool
# configs are structured as pool.<pool_name>
# the pool_name is what clients use as database name when connecting
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
[pools.sharded_db]
# Pool mode (see PgBouncer docs for more).
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"
# Query parser. If enabled, we'll attempt to parse
# every incoming query to determine if it's a read or a write.
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
# we'll direct it to the primary.
query_parser_enabled = true
# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
primary_reads_enabled = true
# So what if you wanted to implement a different hashing function,
# or you've already built one and you want this pooler to use it?
#
# Current options:
#
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"
# Prepared statements cache size.
prepared_statements_cache_size = 500
# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
username = "sharding_user"
password = "sharding_user"
# Maximum number of server connections that can be established for this user
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 5
statement_timeout = 0
[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000
# Shard 0
[pools.sharded_db.shards.0]
# [ host, port, role ]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
# Database name (e.g. "postgres")
database = "shard0"
[pools.sharded_db.shards.1]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard1"
[pools.sharded_db.shards.2]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ],
]
database = "shard2"
[pools.simple_db]
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
[pools.simple_db.users.0]
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0]
servers = [
[ "127.0.0.1", 5432, "primary" ],
[ "localhost", 5432, "replica" ]
]
database = "some_db"

View File

@@ -1,52 +0,0 @@
package pgcat
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq"
"testing"
)
func Test(t *testing.T) {
t.Cleanup(setup(t))
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
}
func namedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
stmt, err := db.Prepare("SELECT $1")
if err != nil {
t.Fatalf("could not prepare: %+v", err)
}
for i := 0; i < 100; i++ {
rows, err := stmt.Query(1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}
func unnamedParameterizedPreparedStatement(t *testing.T) {
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
if err != nil {
t.Fatalf("could not open connection: %+v", err)
}
for i := 0; i < 100; i++ {
// Under the hood QueryContext generates an unnamed parameterized prepared statement
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
if err != nil {
t.Fatalf("could not query: %+v", err)
}
_ = rows.Close()
}
}

View File

@@ -1,81 +0,0 @@
package pgcat
import (
"context"
"database/sql"
_ "embed"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
"testing"
"time"
)
//go:embed pgcat.toml
var pgcatCfg string
var port = rand.Intn(32760-20000) + 20000
func setup(t *testing.T) func() {
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
if err != nil {
t.Fatalf("could not create temp file: %+v", err)
}
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
_, err = cfg.Write([]byte(pgcatCfg))
if err != nil {
t.Fatalf("could not write temp file: %+v", err)
}
commandPath := "../../target/debug/pgcat"
if os.Getenv("CARGO_TARGET_DIR") != "" {
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
}
cmd := exec.Command(commandPath, cfg.Name())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err = cmd.Run()
if err != nil {
t.Errorf("could not run pgcat: %+v", err)
}
}()
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancelFunc()
for {
select {
case <-deadline.Done():
break
case <-time.After(50 * time.Millisecond):
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
if err != nil {
continue
}
rows, err := db.QueryContext(deadline, "SHOW STATS")
if err != nil {
continue
}
_ = rows.Close()
_ = db.Close()
break
}
break
}
return func() {
err := cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatalf("could not interrupt pgcat: %+v", err)
}
err = os.Remove(cfg.Name())
if err != nil {
t.Fatalf("could not remove temp file: %+v", err)
}
}
}

View File

@@ -233,7 +233,7 @@ describe "Stats" do
sleep(1.1) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
end
@@ -260,20 +260,12 @@ describe "Stats" do
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
# two connections waiting => they report wait time
sleep(1.1) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]
# no connections waiting => no reported wait time
expect(results["maxwait"]).to eq("0")
expect(results["maxwait_us"]).to eq("0")
expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
connections.map(&:close)
sleep(4.5) # Allow time for stats to update
@@ -337,40 +329,6 @@ describe "Stats" do
admin_conn.close
connections.map(&:close)
end
context "when client has waited for a server" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
it "shows correct maxwait" do
threads = []
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
end
sleep(2.5) # Allow time for stats to update
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW CLIENTS")
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
expect(non_waiting_clients.count).to eq(2)
non_waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
end
expect(waiting_clients.count).to eq(1)
waiting_clients.each do |client|
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
end
admin_conn.close
connections.map(&:close)
end
end
end

View File

@@ -16,14 +16,7 @@ async fn test_prepared_statements() {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
}
}
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
}
});

View File

@@ -22,7 +22,7 @@ mkdir -p "$deb_dir/etc/systemd/system"
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
chmod +x "$deb_dir/usr/bin/pgcat"
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
(cat control | envsubst) > "$deb_dir/DEBIAN/control"