mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 02:36:29 +00:00
Compare commits
16 Commits
levkk-bump
...
levkk-max-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a419f40ea | ||
|
|
54c4ad140d | ||
|
|
190e32ae85 | ||
|
|
ec3920d60f | ||
|
|
4c5498b915 | ||
|
|
0e8064b049 | ||
|
|
4dbef49ec9 | ||
|
|
bc07dc9c81 | ||
|
|
9b8166b313 | ||
|
|
e58d69f3de | ||
|
|
e76d720ffb | ||
|
|
998cc16a3c | ||
|
|
7c37da2fad | ||
|
|
b45c6b1d23 | ||
|
|
dae240d30c | ||
|
|
b52ea8e7f1 |
@@ -108,8 +108,24 @@ cd ../..
|
|||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py || exit 1
|
python3 tests/python/tests.py || exit 1
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# Go tests
|
||||||
|
# Starts its own pgcat server
|
||||||
|
#
|
||||||
|
pushd tests/go
|
||||||
|
/usr/local/go/bin/go test || exit 1
|
||||||
|
popd
|
||||||
|
|
||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
|
#
|
||||||
|
# Rust tests
|
||||||
|
#
|
||||||
|
cd tests/rust
|
||||||
|
cargo run
|
||||||
|
cd ../../
|
||||||
|
|
||||||
# Admin tests
|
# Admin tests
|
||||||
export PGPASSWORD=admin_pass
|
export PGPASSWORD=admin_pass
|
||||||
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
|
||||||
|
|||||||
2
.github/workflows/publish-deb-package.yml
vendored
2
.github/workflows/publish-deb-package.yml
vendored
@@ -4,7 +4,7 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
packageVersion:
|
packageVersion:
|
||||||
default: "1.1.2-dev"
|
default: "1.1.2-dev1"
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
strategy:
|
strategy:
|
||||||
|
|||||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1020,7 +1020,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.1.2-dev"
|
version = "1.1.2-dev4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "1.1.2-dev"
|
version = "1.1.2-dev4"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|||||||
@@ -9,6 +9,9 @@ RUN sudo apt-get update && \
|
|||||||
sudo apt-get upgrade curl && \
|
sudo apt-get upgrade curl && \
|
||||||
cargo install cargo-binutils rustfilt && \
|
cargo install cargo-binutils rustfilt && \
|
||||||
rustup component add llvm-tools-preview && \
|
rustup component add llvm-tools-preview && \
|
||||||
pip3 install psycopg2 && sudo gem install bundler && \
|
pip3 install psycopg2 && sudo gem install bundler && \
|
||||||
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
wget -O /tmp/toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
||||||
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
|
sudo dpkg -i /tmp/toxiproxy-2.4.0.deb
|
||||||
|
RUN wget -O /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||||
|
sudo tar -C /usr/local -xzf /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||||
|
rm /tmp/go1.21.3.linux-$(dpkg --print-architecture).tar.gz
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
|
|||||||
</a>
|
</a>
|
||||||
</td>
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
|
||||||
<img src="./images/postgresml.webp" height="70" width="auto">
|
<img src="./images/postgresml.webp" height="70" width="auto">
|
||||||
</a>
|
</a>
|
||||||
</td>
|
</td>
|
||||||
@@ -57,7 +57,7 @@ PgCat is stable and used in production to serve hundreds of thousands of queries
|
|||||||
</a>
|
</a>
|
||||||
</td>
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<a href="https://postgresml.org/blog/scaling-postgresml-to-one-million-requests-per-second">
|
<a href="https://postgresml.org/blog/scaling-postgresml-to-1-million-requests-per-second">
|
||||||
PostgresML
|
PostgresML
|
||||||
</a>
|
</a>
|
||||||
</td>
|
</td>
|
||||||
|
|||||||
@@ -301,6 +301,8 @@ username = "other_user"
|
|||||||
password = "other_user"
|
password = "other_user"
|
||||||
pool_size = 21
|
pool_size = 21
|
||||||
statement_timeout = 15000
|
statement_timeout = 15000
|
||||||
|
connect_timeout = 1000
|
||||||
|
idle_timeout = 1000
|
||||||
|
|
||||||
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
|
# Shard configs are structured as pool.<pool_name>.shards.<shard_id>
|
||||||
# Each shard config contains a list of servers that make up the shard
|
# Each shard config contains a list of servers that make up the shard
|
||||||
|
|||||||
@@ -690,6 +690,8 @@ where
|
|||||||
("query_count", DataType::Numeric),
|
("query_count", DataType::Numeric),
|
||||||
("error_count", DataType::Numeric),
|
("error_count", DataType::Numeric),
|
||||||
("age_seconds", DataType::Numeric),
|
("age_seconds", DataType::Numeric),
|
||||||
|
("maxwait", DataType::Numeric),
|
||||||
|
("maxwait_us", DataType::Numeric),
|
||||||
];
|
];
|
||||||
|
|
||||||
let new_map = get_client_stats();
|
let new_map = get_client_stats();
|
||||||
@@ -697,6 +699,7 @@ where
|
|||||||
res.put(row_description(&columns));
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
for (_, client) in new_map {
|
for (_, client) in new_map {
|
||||||
|
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
||||||
let row = vec![
|
let row = vec![
|
||||||
format!("{:#010X}", client.client_id()),
|
format!("{:#010X}", client.client_id()),
|
||||||
client.pool_name(),
|
client.pool_name(),
|
||||||
@@ -710,6 +713,8 @@ where
|
|||||||
.duration_since(client.connect_time())
|
.duration_since(client.connect_time())
|
||||||
.as_secs()
|
.as_secs()
|
||||||
.to_string(),
|
.to_string(),
|
||||||
|
(max_wait / 1_000_000).to_string(),
|
||||||
|
(max_wait % 1_000_000).to_string(),
|
||||||
];
|
];
|
||||||
|
|
||||||
res.put(data_row(&row));
|
res.put(data_row(&row));
|
||||||
|
|||||||
@@ -79,6 +79,8 @@ impl AuthPassthrough {
|
|||||||
pool_mode: None,
|
pool_mode: None,
|
||||||
server_lifetime: None,
|
server_lifetime: None,
|
||||||
min_pool_size: None,
|
min_pool_size: None,
|
||||||
|
connect_timeout: None,
|
||||||
|
idle_timeout: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let user = &address.username;
|
let user = &address.username;
|
||||||
|
|||||||
@@ -1149,7 +1149,7 @@ where
|
|||||||
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
// This reads the first byte without advancing the internal pointer and mutating the bytes
|
||||||
let code = *message.first().unwrap() as char;
|
let code = *message.first().unwrap() as char;
|
||||||
|
|
||||||
trace!("Message: {}", code);
|
trace!("Client message: {}", code);
|
||||||
|
|
||||||
match code {
|
match code {
|
||||||
// Query
|
// Query
|
||||||
@@ -1188,6 +1188,7 @@ where
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
self.send_and_receive_loop(
|
||||||
@@ -1320,6 +1321,7 @@ where
|
|||||||
{
|
{
|
||||||
match protocol_data {
|
match protocol_data {
|
||||||
ExtendedProtocolData::Parse { data, metadata } => {
|
ExtendedProtocolData::Parse { data, metadata } => {
|
||||||
|
debug!("Have parse in extended buffer");
|
||||||
let (parse, hash) = match metadata {
|
let (parse, hash) = match metadata {
|
||||||
Some(metadata) => metadata,
|
Some(metadata) => metadata,
|
||||||
None => {
|
None => {
|
||||||
@@ -1656,11 +1658,25 @@ where
|
|||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match self.prepared_statements.get(&client_name) {
|
match self.prepared_statements.get(&client_name) {
|
||||||
Some((parse, hash)) => {
|
Some((parse, hash)) => {
|
||||||
debug!("Prepared statement `{}` found in cache", parse.name);
|
debug!("Prepared statement `{}` found in cache", client_name);
|
||||||
// In this case we want to send the parse message to the server
|
// In this case we want to send the parse message to the server
|
||||||
// since pgcat is initiating the prepared statement on this specific server
|
// since pgcat is initiating the prepared statement on this specific server
|
||||||
self.register_parse_to_server_cache(true, hash, parse, pool, server, address)
|
match self
|
||||||
.await?;
|
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => match err {
|
||||||
|
Error::PreparedStatementError => {
|
||||||
|
debug!("Removed {} from client cache", client_name);
|
||||||
|
self.prepared_statements.remove(&client_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
None => {
|
None => {
|
||||||
@@ -1689,11 +1705,20 @@ where
|
|||||||
// We want to promote this in the pool's LRU
|
// We want to promote this in the pool's LRU
|
||||||
pool.promote_prepared_statement_hash(hash);
|
pool.promote_prepared_statement_hash(hash);
|
||||||
|
|
||||||
|
debug!("Checking for prepared statement {}", parse.name);
|
||||||
|
|
||||||
if let Err(err) = server
|
if let Err(err) = server
|
||||||
.register_prepared_statement(parse, should_send_parse_to_server)
|
.register_prepared_statement(parse, should_send_parse_to_server)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
match err {
|
||||||
|
// Don't ban for this.
|
||||||
|
Error::PreparedStatementError => (),
|
||||||
|
_ => {
|
||||||
|
pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1704,18 +1729,14 @@ where
|
|||||||
/// and also the pool's statement cache. Add it to extended protocol data.
|
/// and also the pool's statement cache. Add it to extended protocol data.
|
||||||
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
let client_given_name = match self.prepared_statements_enabled {
|
if !self.prepared_statements_enabled {
|
||||||
true => Parse::get_name(&message)?,
|
|
||||||
false => "".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if client_given_name.is_empty() {
|
|
||||||
debug!("Anonymous parse message");
|
debug!("Anonymous parse message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let client_given_name = Parse::get_name(&message)?;
|
||||||
let parse: Parse = (&message).try_into()?;
|
let parse: Parse = (&message).try_into()?;
|
||||||
|
|
||||||
// Compute the hash of the parse statement
|
// Compute the hash of the parse statement
|
||||||
@@ -1753,18 +1774,15 @@ where
|
|||||||
/// saved in the client cache.
|
/// saved in the client cache.
|
||||||
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
let client_given_name = match self.prepared_statements_enabled {
|
if !self.prepared_statements_enabled {
|
||||||
true => Bind::get_name(&message)?,
|
|
||||||
false => "".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if client_given_name.is_empty() {
|
|
||||||
debug!("Anonymous bind message");
|
debug!("Anonymous bind message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let client_given_name = Bind::get_name(&message)?;
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
Some((rewritten_parse, _)) => {
|
Some((rewritten_parse, _)) => {
|
||||||
let message = Bind::rename(message, &rewritten_parse.name)?;
|
let message = Bind::rename(message, &rewritten_parse.name)?;
|
||||||
@@ -1807,12 +1825,7 @@ where
|
|||||||
/// saved in the client cache.
|
/// saved in the client cache.
|
||||||
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
|
async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
let describe: Describe = match self.prepared_statements_enabled {
|
if !self.prepared_statements_enabled {
|
||||||
true => (&message).try_into()?,
|
|
||||||
false => Describe::empty_new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if describe.anonymous() {
|
|
||||||
debug!("Anonymous describe message");
|
debug!("Anonymous describe message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||||
@@ -1820,6 +1833,15 @@ where
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let describe: Describe = (&message).try_into()?;
|
||||||
|
if describe.target == 'P' {
|
||||||
|
debug!("Portal describe message");
|
||||||
|
self.extended_protocol_data_buffer
|
||||||
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let client_given_name = describe.statement_name.clone();
|
let client_given_name = describe.statement_name.clone();
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
|
|||||||
@@ -216,6 +216,8 @@ pub struct User {
|
|||||||
pub server_lifetime: Option<u64>,
|
pub server_lifetime: Option<u64>,
|
||||||
#[serde(default)] // 0
|
#[serde(default)] // 0
|
||||||
pub statement_timeout: u64,
|
pub statement_timeout: u64,
|
||||||
|
pub connect_timeout: Option<u64>,
|
||||||
|
pub idle_timeout: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for User {
|
impl Default for User {
|
||||||
@@ -230,6 +232,8 @@ impl Default for User {
|
|||||||
statement_timeout: 0,
|
statement_timeout: 0,
|
||||||
pool_mode: None,
|
pool_mode: None,
|
||||||
server_lifetime: None,
|
server_lifetime: None,
|
||||||
|
connect_timeout: None,
|
||||||
|
idle_timeout: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1307,6 +1311,24 @@ impl Config {
|
|||||||
None => "default".to_string(),
|
None => "default".to_string(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
info!(
|
||||||
|
"[pool: {}][user: {}] Connection timeout: {}",
|
||||||
|
pool_name,
|
||||||
|
user.1.username,
|
||||||
|
match user.1.connect_timeout {
|
||||||
|
Some(connect_timeout) => format!("{}ms", connect_timeout),
|
||||||
|
None => "not set".to_string(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
info!(
|
||||||
|
"[pool: {}][user: {}] Idle timeout: {}",
|
||||||
|
pool_name,
|
||||||
|
user.1.username,
|
||||||
|
match user.1.idle_timeout {
|
||||||
|
Some(idle_timeout) => format!("{}ms", idle_timeout),
|
||||||
|
None => "not set".to_string(),
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ pub enum Error {
|
|||||||
QueryRouterParserError(String),
|
QueryRouterParserError(String),
|
||||||
QueryRouterError(String),
|
QueryRouterError(String),
|
||||||
InvalidShardId(usize),
|
InvalidShardId(usize),
|
||||||
|
PreparedStatementError,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
|||||||
@@ -1109,7 +1109,7 @@ pub struct Describe {
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: i32,
|
len: i32,
|
||||||
target: char,
|
pub target: char,
|
||||||
pub statement_name: String,
|
pub statement_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
14
src/pool.rs
14
src/pool.rs
@@ -436,14 +436,20 @@ impl ConnectionPool {
|
|||||||
pool_config.prepared_statements_cache_size,
|
pool_config.prepared_statements_cache_size,
|
||||||
);
|
);
|
||||||
|
|
||||||
let connect_timeout = match pool_config.connect_timeout {
|
let connect_timeout = match user.connect_timeout {
|
||||||
Some(connect_timeout) => connect_timeout,
|
Some(connect_timeout) => connect_timeout,
|
||||||
None => config.general.connect_timeout,
|
None => match pool_config.connect_timeout {
|
||||||
|
Some(connect_timeout) => connect_timeout,
|
||||||
|
None => config.general.connect_timeout,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let idle_timeout = match pool_config.idle_timeout {
|
let idle_timeout = match user.idle_timeout {
|
||||||
Some(idle_timeout) => idle_timeout,
|
Some(idle_timeout) => idle_timeout,
|
||||||
None => config.general.idle_timeout,
|
None => match pool_config.idle_timeout {
|
||||||
|
Some(idle_timeout) => idle_timeout,
|
||||||
|
None => config.general.idle_timeout,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let server_lifetime = match user.server_lifetime {
|
let server_lifetime = match user.server_lifetime {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use lru::LruCache;
|
|||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use postgres_protocol::message;
|
use postgres_protocol::message;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
@@ -325,6 +325,9 @@ pub struct Server {
|
|||||||
|
|
||||||
/// Prepared statements
|
/// Prepared statements
|
||||||
prepared_statement_cache: Option<LruCache<String, ()>>,
|
prepared_statement_cache: Option<LruCache<String, ()>>,
|
||||||
|
|
||||||
|
/// Prepared statement being currently registered on the server.
|
||||||
|
registering_prepared_statement: VecDeque<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
@@ -827,6 +830,7 @@ impl Server {
|
|||||||
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
|
NonZeroUsize::new(prepared_statement_cache_size).unwrap(),
|
||||||
)),
|
)),
|
||||||
},
|
},
|
||||||
|
registering_prepared_statement: VecDeque::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
return Ok(server);
|
return Ok(server);
|
||||||
@@ -956,7 +960,6 @@ impl Server {
|
|||||||
|
|
||||||
// There is no more data available from the server.
|
// There is no more data available from the server.
|
||||||
self.data_available = false;
|
self.data_available = false;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -966,6 +969,23 @@ impl Server {
|
|||||||
self.in_copy_mode = false;
|
self.in_copy_mode = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove the prepared statement from the cache, it has a syntax error or something else bad happened.
|
||||||
|
if let Some(prepared_stmt_name) =
|
||||||
|
self.registering_prepared_statement.pop_front()
|
||||||
|
{
|
||||||
|
if let Some(ref mut cache) = self.prepared_statement_cache {
|
||||||
|
if let Some(_removed) = cache.pop(&prepared_stmt_name) {
|
||||||
|
debug!(
|
||||||
|
"Removed {} from prepared statement cache",
|
||||||
|
prepared_stmt_name
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Shouldn't happen.
|
||||||
|
debug!("Prepared statement {} was not cached", prepared_stmt_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if self.prepared_statement_cache.is_some() {
|
if self.prepared_statement_cache.is_some() {
|
||||||
let error_message = PgErrorMsg::parse(&message)?;
|
let error_message = PgErrorMsg::parse(&message)?;
|
||||||
if error_message.message == "cached plan must not change result type" {
|
if error_message.message == "cached plan must not change result type" {
|
||||||
@@ -1068,6 +1088,11 @@ impl Server {
|
|||||||
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
|
// Buffer until ReadyForQuery shows up, so don't exit the loop yet.
|
||||||
'c' => (),
|
'c' => (),
|
||||||
|
|
||||||
|
// Parse complete successfully
|
||||||
|
'1' => {
|
||||||
|
self.registering_prepared_statement.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
// Anything else, e.g. errors, notices, etc.
|
// Anything else, e.g. errors, notices, etc.
|
||||||
// Keep buffering until ReadyForQuery shows up.
|
// Keep buffering until ReadyForQuery shows up.
|
||||||
_ => (),
|
_ => (),
|
||||||
@@ -1107,7 +1132,7 @@ impl Server {
|
|||||||
has_it
|
has_it
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||||
let cache = match &mut self.prepared_statement_cache {
|
let cache = match &mut self.prepared_statement_cache {
|
||||||
Some(cache) => cache,
|
Some(cache) => cache,
|
||||||
None => return None,
|
None => return None,
|
||||||
@@ -1129,7 +1154,7 @@ impl Server {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_prepared_statement_from_cache(&mut self, name: &str) {
|
fn remove_prepared_statement_from_cache(&mut self, name: &str) {
|
||||||
let cache = match &mut self.prepared_statement_cache {
|
let cache = match &mut self.prepared_statement_cache {
|
||||||
Some(cache) => cache,
|
Some(cache) => cache,
|
||||||
None => return,
|
None => return,
|
||||||
@@ -1145,6 +1170,9 @@ impl Server {
|
|||||||
should_send_parse_to_server: bool,
|
should_send_parse_to_server: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
if !self.has_prepared_statement(&parse.name) {
|
if !self.has_prepared_statement(&parse.name) {
|
||||||
|
self.registering_prepared_statement
|
||||||
|
.push_back(parse.name.clone());
|
||||||
|
|
||||||
let mut bytes = BytesMut::new();
|
let mut bytes = BytesMut::new();
|
||||||
|
|
||||||
if should_send_parse_to_server {
|
if should_send_parse_to_server {
|
||||||
@@ -1176,7 +1204,13 @@ impl Server {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(())
|
// If it's not there, something went bad, I'm guessing bad syntax or permissions error
|
||||||
|
// on the server.
|
||||||
|
if !self.has_prepared_statement(&parse.name) {
|
||||||
|
Err(Error::PreparedStatementError)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If the server is still inside a transaction.
|
/// If the server is still inside a transaction.
|
||||||
@@ -1186,6 +1220,7 @@ impl Server {
|
|||||||
self.in_transaction
|
self.in_transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Currently copying data from client to server or vice-versa.
|
||||||
pub fn in_copy_mode(&self) -> bool {
|
pub fn in_copy_mode(&self) -> bool {
|
||||||
self.in_copy_mode
|
self.in_copy_mode
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,8 +38,10 @@ pub struct ClientStats {
|
|||||||
/// Total time spent waiting for a connection from pool, measures in microseconds
|
/// Total time spent waiting for a connection from pool, measures in microseconds
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
pub total_wait_time: Arc<AtomicU64>,
|
||||||
|
|
||||||
/// Maximum time spent waiting for a connection from pool, measures in microseconds
|
/// When this client started waiting.
|
||||||
pub max_wait_time: Arc<AtomicU64>,
|
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
|
||||||
|
/// of us using an "AtomicInstant"
|
||||||
|
pub wait_start: Arc<AtomicU64>,
|
||||||
|
|
||||||
/// Current state of the client
|
/// Current state of the client
|
||||||
pub state: Arc<AtomicClientState>,
|
pub state: Arc<AtomicClientState>,
|
||||||
@@ -63,7 +65,7 @@ impl Default for ClientStats {
|
|||||||
username: String::new(),
|
username: String::new(),
|
||||||
pool_name: String::new(),
|
pool_name: String::new(),
|
||||||
total_wait_time: Arc::new(AtomicU64::new(0)),
|
total_wait_time: Arc::new(AtomicU64::new(0)),
|
||||||
max_wait_time: Arc::new(AtomicU64::new(0)),
|
wait_start: Arc::new(AtomicU64::new(0)),
|
||||||
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
|
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
|
||||||
transaction_count: Arc::new(AtomicU64::new(0)),
|
transaction_count: Arc::new(AtomicU64::new(0)),
|
||||||
query_count: Arc::new(AtomicU64::new(0)),
|
query_count: Arc::new(AtomicU64::new(0)),
|
||||||
@@ -111,6 +113,11 @@ impl ClientStats {
|
|||||||
|
|
||||||
/// Reports a client is waiting for a connection
|
/// Reports a client is waiting for a connection
|
||||||
pub fn waiting(&self) {
|
pub fn waiting(&self) {
|
||||||
|
// safe to truncate, we only lose info if duration is greater than ~585,000 years
|
||||||
|
self.wait_start.store(
|
||||||
|
Instant::now().duration_since(self.connect_time).as_micros() as u64,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
self.state.store(ClientState::Waiting, Ordering::Relaxed);
|
self.state.store(ClientState::Waiting, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,8 +141,6 @@ impl ClientStats {
|
|||||||
pub fn checkout_time(&self, microseconds: u64) {
|
pub fn checkout_time(&self, microseconds: u64) {
|
||||||
self.total_wait_time
|
self.total_wait_time
|
||||||
.fetch_add(microseconds, Ordering::Relaxed);
|
.fetch_add(microseconds, Ordering::Relaxed);
|
||||||
self.max_wait_time
|
|
||||||
.fetch_max(microseconds, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a query executed by a client against a server
|
/// Report a query executed by a client against a server
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use super::{ClientState, ServerState};
|
|||||||
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
|
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::pool::get_all_pools;
|
use crate::pool::get_all_pools;
|
||||||
|
|
||||||
@@ -53,6 +54,7 @@ impl PoolStats {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
for client in client_map.values() {
|
for client in client_map.values() {
|
||||||
match map.get_mut(&PoolIdentifier {
|
match map.get_mut(&PoolIdentifier {
|
||||||
db: client.pool_name(),
|
db: client.pool_name(),
|
||||||
@@ -62,10 +64,16 @@ impl PoolStats {
|
|||||||
match client.state.load(Ordering::Relaxed) {
|
match client.state.load(Ordering::Relaxed) {
|
||||||
ClientState::Active => pool_stats.cl_active += 1,
|
ClientState::Active => pool_stats.cl_active += 1,
|
||||||
ClientState::Idle => pool_stats.cl_idle += 1,
|
ClientState::Idle => pool_stats.cl_idle += 1,
|
||||||
ClientState::Waiting => pool_stats.cl_waiting += 1,
|
ClientState::Waiting => {
|
||||||
|
pool_stats.cl_waiting += 1;
|
||||||
|
// wait_start is measured as microseconds since connect_time
|
||||||
|
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
|
||||||
|
let duration_since_connect = now.duration_since(client.connect_time());
|
||||||
|
let wait_time = (duration_since_connect.as_micros() as u64)
|
||||||
|
- client.wait_start.load(Ordering::Relaxed);
|
||||||
|
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
|
|
||||||
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
|
|
||||||
}
|
}
|
||||||
None => debug!("Client from an obselete pool"),
|
None => debug!("Client from an obselete pool"),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,3 +8,6 @@ RUN rustup component add llvm-tools-preview
|
|||||||
RUN sudo gem install bundler
|
RUN sudo gem install bundler
|
||||||
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
RUN wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb && \
|
||||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||||
|
RUN wget -O go1.21.3.linux-$(dpkg --print-architecture).tar.gz https://go.dev/dl/go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||||
|
sudo tar -C /usr/local -xzf go1.21.3.linux-$(dpkg --print-architecture).tar.gz && \
|
||||||
|
rm go1.21.3.linux-$(dpkg --print-architecture).tar.gz
|
||||||
|
|||||||
5
tests/go/go.mod
Normal file
5
tests/go/go.mod
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
module pgcat
|
||||||
|
|
||||||
|
go 1.21
|
||||||
|
|
||||||
|
require github.com/lib/pq v1.10.9
|
||||||
2
tests/go/go.sum
Normal file
2
tests/go/go.sum
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
162
tests/go/pgcat.toml
Normal file
162
tests/go/pgcat.toml
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
#
|
||||||
|
# PgCat config example.
|
||||||
|
#
|
||||||
|
|
||||||
|
#
|
||||||
|
# General pooler settings
|
||||||
|
[general]
|
||||||
|
# What IP to run on, 0.0.0.0 means accessible from everywhere.
|
||||||
|
host = "0.0.0.0"
|
||||||
|
|
||||||
|
# Port to run on, same as PgBouncer used in this example.
|
||||||
|
port = "${PORT}"
|
||||||
|
|
||||||
|
# Whether to enable prometheus exporter or not.
|
||||||
|
enable_prometheus_exporter = true
|
||||||
|
|
||||||
|
# Port at which prometheus exporter listens on.
|
||||||
|
prometheus_exporter_port = 9930
|
||||||
|
|
||||||
|
# How long to wait before aborting a server connection (ms).
|
||||||
|
connect_timeout = 1000
|
||||||
|
|
||||||
|
# How much time to give the health check query to return with a result (ms).
|
||||||
|
healthcheck_timeout = 1000
|
||||||
|
|
||||||
|
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
|
||||||
|
healthcheck_delay = 30000
|
||||||
|
|
||||||
|
# How much time to give clients during shutdown before forcibly killing client connections (ms).
|
||||||
|
shutdown_timeout = 5000
|
||||||
|
|
||||||
|
# For how long to ban a server if it fails a health check (seconds).
|
||||||
|
ban_time = 60 # Seconds
|
||||||
|
|
||||||
|
# If we should log client connections
|
||||||
|
log_client_connections = false
|
||||||
|
|
||||||
|
# If we should log client disconnections
|
||||||
|
log_client_disconnections = false
|
||||||
|
|
||||||
|
# Reload config automatically if it changes.
|
||||||
|
autoreload = 15000
|
||||||
|
|
||||||
|
server_round_robin = false
|
||||||
|
|
||||||
|
# TLS
|
||||||
|
tls_certificate = "../../.circleci/server.cert"
|
||||||
|
tls_private_key = "../../.circleci/server.key"
|
||||||
|
|
||||||
|
# Credentials to access the virtual administrative database (pgbouncer or pgcat)
|
||||||
|
# Connecting to that database allows running commands like `SHOW POOLS`, `SHOW DATABASES`, etc..
|
||||||
|
admin_username = "admin_user"
|
||||||
|
admin_password = "admin_pass"
|
||||||
|
|
||||||
|
# pool
|
||||||
|
# configs are structured as pool.<pool_name>
|
||||||
|
# the pool_name is what clients use as database name when connecting
|
||||||
|
# For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded_db"
|
||||||
|
[pools.sharded_db]
|
||||||
|
# Pool mode (see PgBouncer docs for more).
|
||||||
|
# session: one server connection per connected client
|
||||||
|
# transaction: one server connection per client transaction
|
||||||
|
pool_mode = "transaction"
|
||||||
|
|
||||||
|
# If the client doesn't specify, route traffic to
|
||||||
|
# this role by default.
|
||||||
|
#
|
||||||
|
# any: round-robin between primary and replicas,
|
||||||
|
# replica: round-robin between replicas only without touching the primary,
|
||||||
|
# primary: all queries go to the primary unless otherwise specified.
|
||||||
|
default_role = "any"
|
||||||
|
|
||||||
|
# Query parser. If enabled, we'll attempt to parse
|
||||||
|
# every incoming query to determine if it's a read or a write.
|
||||||
|
# If it's a read query, we'll direct it to a replica. Otherwise, if it's a write,
|
||||||
|
# we'll direct it to the primary.
|
||||||
|
query_parser_enabled = true
|
||||||
|
|
||||||
|
# If the query parser is enabled and this setting is enabled, we'll attempt to
|
||||||
|
# infer the role from the query itself.
|
||||||
|
query_parser_read_write_splitting = true
|
||||||
|
|
||||||
|
# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
|
||||||
|
# load balancing of read queries. Otherwise, the primary will only be used for write
|
||||||
|
# queries. The primary can always be explicitely selected with our custom protocol.
|
||||||
|
primary_reads_enabled = true
|
||||||
|
|
||||||
|
# So what if you wanted to implement a different hashing function,
|
||||||
|
# or you've already built one and you want this pooler to use it?
|
||||||
|
#
|
||||||
|
# Current options:
|
||||||
|
#
|
||||||
|
# pg_bigint_hash: PARTITION BY HASH (Postgres hashing function)
|
||||||
|
# sha1: A hashing function based on SHA1
|
||||||
|
#
|
||||||
|
sharding_function = "pg_bigint_hash"
|
||||||
|
|
||||||
|
# Prepared statements cache size.
|
||||||
|
prepared_statements_cache_size = 500
|
||||||
|
|
||||||
|
# Credentials for users that may connect to this cluster
|
||||||
|
[pools.sharded_db.users.0]
|
||||||
|
username = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
# Maximum number of server connections that can be established for this user
|
||||||
|
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||||
|
# is the sum of pool_size across all users.
|
||||||
|
pool_size = 5
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
|
|
||||||
|
[pools.sharded_db.users.1]
|
||||||
|
username = "other_user"
|
||||||
|
password = "other_user"
|
||||||
|
pool_size = 21
|
||||||
|
statement_timeout = 30000
|
||||||
|
|
||||||
|
# Shard 0
|
||||||
|
[pools.sharded_db.shards.0]
|
||||||
|
# [ host, port, role ]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
[ "localhost", 5432, "replica" ]
|
||||||
|
]
|
||||||
|
# Database name (e.g. "postgres")
|
||||||
|
database = "shard0"
|
||||||
|
|
||||||
|
[pools.sharded_db.shards.1]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
[ "localhost", 5432, "replica" ],
|
||||||
|
]
|
||||||
|
database = "shard1"
|
||||||
|
|
||||||
|
[pools.sharded_db.shards.2]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
[ "localhost", 5432, "replica" ],
|
||||||
|
]
|
||||||
|
database = "shard2"
|
||||||
|
|
||||||
|
|
||||||
|
[pools.simple_db]
|
||||||
|
pool_mode = "session"
|
||||||
|
default_role = "primary"
|
||||||
|
query_parser_enabled = true
|
||||||
|
query_parser_read_write_splitting = true
|
||||||
|
primary_reads_enabled = true
|
||||||
|
sharding_function = "pg_bigint_hash"
|
||||||
|
|
||||||
|
[pools.simple_db.users.0]
|
||||||
|
username = "simple_user"
|
||||||
|
password = "simple_user"
|
||||||
|
pool_size = 5
|
||||||
|
statement_timeout = 30000
|
||||||
|
|
||||||
|
[pools.simple_db.shards.0]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432, "primary" ],
|
||||||
|
[ "localhost", 5432, "replica" ]
|
||||||
|
]
|
||||||
|
database = "some_db"
|
||||||
52
tests/go/prepared_test.go
Normal file
52
tests/go/prepared_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package pgcat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test(t *testing.T) {
|
||||||
|
t.Cleanup(setup(t))
|
||||||
|
t.Run("Named parameterized prepared statement works", namedParameterizedPreparedStatement)
|
||||||
|
t.Run("Unnamed parameterized prepared statement works", unnamedParameterizedPreparedStatement)
|
||||||
|
}
|
||||||
|
|
||||||
|
func namedParameterizedPreparedStatement(t *testing.T) {
|
||||||
|
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not open connection: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := db.Prepare("SELECT $1")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not prepare: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
rows, err := stmt.Query(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not query: %+v", err)
|
||||||
|
}
|
||||||
|
_ = rows.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func unnamedParameterizedPreparedStatement(t *testing.T) {
|
||||||
|
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=sharded_db user=sharding_user password=sharding_user sslmode=disable", port))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not open connection: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
// Under the hood QueryContext generates an unnamed parameterized prepared statement
|
||||||
|
rows, err := db.QueryContext(context.Background(), "SELECT $1", 1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not query: %+v", err)
|
||||||
|
}
|
||||||
|
_ = rows.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
81
tests/go/setup.go
Normal file
81
tests/go/setup.go
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
package pgcat
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
_ "embed"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed pgcat.toml
|
||||||
|
var pgcatCfg string
|
||||||
|
|
||||||
|
var port = rand.Intn(32760-20000) + 20000
|
||||||
|
|
||||||
|
func setup(t *testing.T) func() {
|
||||||
|
cfg, err := os.CreateTemp("/tmp", "pgcat_cfg_*.toml")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not create temp file: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pgcatCfg = strings.Replace(pgcatCfg, "\"${PORT}\"", fmt.Sprintf("%d", port), 1)
|
||||||
|
|
||||||
|
_, err = cfg.Write([]byte(pgcatCfg))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not write temp file: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
commandPath := "../../target/debug/pgcat"
|
||||||
|
if os.Getenv("CARGO_TARGET_DIR") != "" {
|
||||||
|
commandPath = os.Getenv("CARGO_TARGET_DIR") + "/debug/pgcat"
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.Command(commandPath, cfg.Name())
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
go func() {
|
||||||
|
err = cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("could not run pgcat: %+v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
|
||||||
|
defer cancelFunc()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-deadline.Done():
|
||||||
|
break
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost port=%d database=pgcat user=admin_user password=admin_pass sslmode=disable", port))
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rows, err := db.QueryContext(deadline, "SHOW STATS")
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_ = rows.Close()
|
||||||
|
_ = db.Close()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
err := cmd.Process.Signal(os.Interrupt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not interrupt pgcat: %+v", err)
|
||||||
|
}
|
||||||
|
err = os.Remove(cfg.Name())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not remove temp file: %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -233,7 +233,7 @@ describe "Stats" do
|
|||||||
sleep(1.1) # Allow time for stats to update
|
sleep(1.1) # Allow time for stats to update
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
|
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
|
||||||
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -260,12 +260,20 @@ describe "Stats" do
|
|||||||
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
||||||
end
|
end
|
||||||
|
|
||||||
sleep(2.5) # Allow time for stats to update
|
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
|
||||||
|
# two connections waiting => they report wait time
|
||||||
|
sleep(1.1) # Allow time for stats to update
|
||||||
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
|
expect(results["maxwait"]).to eq("1")
|
||||||
|
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
|
||||||
|
|
||||||
|
sleep(2.5) # Allow time for stats to update
|
||||||
results = admin_conn.async_exec("SHOW POOLS")[0]
|
results = admin_conn.async_exec("SHOW POOLS")[0]
|
||||||
|
|
||||||
expect(results["maxwait"]).to eq("1")
|
# no connections waiting => no reported wait time
|
||||||
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
|
expect(results["maxwait"]).to eq("0")
|
||||||
|
expect(results["maxwait_us"]).to eq("0")
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
|
|
||||||
sleep(4.5) # Allow time for stats to update
|
sleep(4.5) # Allow time for stats to update
|
||||||
@@ -329,6 +337,40 @@ describe "Stats" do
|
|||||||
admin_conn.close
|
admin_conn.close
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "when client has waited for a server" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
|
||||||
|
|
||||||
|
it "shows correct maxwait" do
|
||||||
|
threads = []
|
||||||
|
connections = Array.new(3) { |i| PG::connect("#{pgcat_conn_str}?application_name=app#{i}") }
|
||||||
|
connections.each do |c|
|
||||||
|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(2.5) # Allow time for stats to update
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
results = admin_conn.async_exec("SHOW CLIENTS")
|
||||||
|
|
||||||
|
normal_client_results = results.reject { |r| r["database"] == "pgcat" }
|
||||||
|
|
||||||
|
non_waiting_clients = normal_client_results.select { |c| c["maxwait"] == "0" }
|
||||||
|
waiting_clients = normal_client_results.select { |c| c["maxwait"].to_i > 0 }
|
||||||
|
|
||||||
|
expect(non_waiting_clients.count).to eq(2)
|
||||||
|
non_waiting_clients.each do |client|
|
||||||
|
expect(client["maxwait_us"].to_i).to be_between(0, 50_000)
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(waiting_clients.count).to eq(1)
|
||||||
|
waiting_clients.each do |client|
|
||||||
|
expect(client["maxwait_us"].to_i).to be_within(200_000).of(500_000)
|
||||||
|
end
|
||||||
|
|
||||||
|
admin_conn.close
|
||||||
|
connections.map(&:close)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,14 @@ async fn test_prepared_statements() {
|
|||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = tokio::task::spawn(async move {
|
||||||
for _ in 0..1000 {
|
for _ in 0..1000 {
|
||||||
sqlx::query("SELECT 1").fetch_all(&pool).await.unwrap();
|
match sqlx::query("SELECT one").fetch_all(&pool).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
if err.to_string().contains("prepared statement") {
|
||||||
|
panic!("prepared statement error: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ mkdir -p "$deb_dir/etc/systemd/system"
|
|||||||
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
|
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
|
||||||
chmod +x "$deb_dir/usr/bin/pgcat"
|
chmod +x "$deb_dir/usr/bin/pgcat"
|
||||||
|
|
||||||
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
|
cp pgcat.toml "$deb_dir/etc/pgcat.example.toml"
|
||||||
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
|
cp pgcat.service "$deb_dir/etc/systemd/system/pgcat.service"
|
||||||
|
|
||||||
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
|
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
|
||||||
|
|||||||
Reference in New Issue
Block a user