Compare commits

...

16 Commits

Author SHA1 Message Date
Lev Kokotov
bca5318d5e Fix broken custom config 2022-09-12 15:58:11 -04:00
Lev Kokotov
efd6b2edae Automatic shard detection 2022-09-12 15:07:10 -04:00
Lev Kokotov
075167431d Add Discord link (#164)
* Add Discord link

* move it up

* :)

* hmm

* hmm

* :O
2022-09-08 08:12:37 -07:00
Mostafa Abdelraouf
9514b3b2d1 Clean connection state up after protocol named prepared statement (#163)
* Clean connection state up after protocol named prepared statement

* Avoid cloning + add test

* fmt
2022-09-07 20:37:17 -07:00
Lev Kokotov
6d41640ea9 Send signal even if process is gone (#162)
* Send signal even if process is gone

* hmm

* hmm
2022-09-07 09:22:52 -07:00
Mostafa Abdelraouf
744ceada86 Better logging for failure to get connection from pool (#161) 2022-09-07 08:24:07 -07:00
Mostafa Abdelraouf
a5c8dd69b2 Avoid reporting ProtocolSyncError when admin session disconnects (#160)
* Avoid reporting ProtocolSyncError when admin session disconnects

* rebuild

* rebuild
2022-09-06 22:22:31 -07:00
zainkabani
6a9a4db648 Adds microsecond logging and also reformats duration to include milliseconds (#156)
* Adds microsecond logging and also reformats duration to include milliseconds

* fmt

* attempt to fix cd

* revert
2022-09-05 01:21:27 -07:00
Mostafa Abdelraouf
976b406468 Main Thread Panic when swarmed with clients (#158)
* Main Thread Panic when swarmed with clients

* fix

* fix

* 1024

* fix

* remove test

* Update src/client.rs

* Update src/main.rs

* Update src/client.rs

* Update src/main.rs

Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
2022-09-05 01:21:06 -07:00
zain-kabani
417358c35d Patch graceful shutdown bug (#157)
* Fixes non-admin client counting error

* Add log when sigterm received and log number of active clients when shutdown timeout is reached
2022-09-05 01:02:49 -07:00
Mostafa Abdelraouf
23a642f4a4 Send DISCARD ALL even if client is not in transaction (#152)
* Send DISCARD ALL even if client is not in transaction

* fmt

* Added tests + avoided sending extra discard all

* Adds set name logic to beginning of handle client

* fmt

* refactor dead code handling

* Refactor reading command tag

* remove unnecessary trim

* Removing debugging statement

* typo

* typo{

* documentation

* edit text

* un-unwrap

* run ci

* run ci

Co-authored-by: Zain Kabani <zain.kabani@instacart.com>
2022-09-01 20:06:55 -07:00
Mostafa Abdelraouf
7f20dc3054 Better handling extended protocol messages in the event of busy pool (#155)
* Better handling for checkout errors during extended protocol messages

* Fix specs

* comment
2022-09-01 15:02:39 -07:00
Mostafa Abdelraouf
36339bd96f Log Address information in connection create/drop (#154)
* Log Address information in connection create/drop

* run ci
2022-09-01 11:16:22 -07:00
Mostafa Abdelraouf
65b69b46d2 Allow running integration tests with coverage locally (#151) 2022-08-30 10:43:45 -07:00
Mostafa Abdelraouf
d48c04a7fb Ruby integration tests (#147)
* Ruby integration tests

* forgot a file

* refactor

* refactoring

* more refactoring

* remove config helper

* try multiple databases

* fix

* more databases

* Use pg stats

* ports

* speed

* Fix tests

* preload library

* comment
2022-08-30 09:14:53 -07:00
zainkabani
2628dec42e Move autoreloader to own tokio task (#148) 2022-08-29 00:08:44 -07:00
25 changed files with 1078 additions and 294 deletions

View File

@@ -15,14 +15,34 @@ jobs:
RUSTFLAGS: "-C instrument-coverage"
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
- image: postgres:14
# auth:
# username: mydockerhub-user
# password: $DOCKERHUB_PASSWORD
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
- image: postgres:14
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
- image: postgres:14
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
- image: postgres:14
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
# Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
steps:

View File

@@ -16,6 +16,9 @@ function start_pgcat() {
# Setup the database with shards and user
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
@@ -26,7 +29,7 @@ wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/downlo
sudo dpkg -i toxiproxy-2.4.0.deb
# Start Toxiproxy
toxiproxy-server &
LOG_LEVEL=error toxiproxy-server &
sleep 1
# Create a database at port 5433, forward it to Postgres
@@ -87,7 +90,8 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
cd tests/ruby
sudo gem install bundler
bundle install
ruby tests.rb
bundle exec ruby tests.rb || exit 1
bundle exec rspec *_spec.rb || exit 1
cd ../..
#
@@ -95,7 +99,7 @@ cd ../..
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py
python3 tests/python/tests.py || exit 1
start_pgcat "info"
@@ -105,9 +109,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
export PGPASSWORD=sharding_user

View File

@@ -1,8 +1,11 @@
# PgCat
![PgCat](./pgcat3.png)
##### PgCat: PostgreSQL at petabyte scale
[![CircleCI](https://circleci.com/gh/levkk/pgcat/tree/main.svg?style=svg)](https://circleci.com/gh/levkk/pgcat/tree/main)
![PgCat](./pgcat3.png)
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
</a>
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
@@ -87,6 +90,14 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
Run `cargo test` to run Rust tests.
Run the following commands to run Integration tests locally.
```
cd tests/docker/
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
```
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
@@ -447,7 +458,7 @@ Always good to have a base line.
```
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
@@ -461,7 +472,7 @@ tps = 139443.955722 (including connections establishing)
tps = 142314.859075 (excluding connections establishing)
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
@@ -475,7 +486,7 @@ tps = 150644.840891 (including connections establishing)
tps = 152218.499430 (excluding connections establishing)
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1
@@ -489,7 +500,7 @@ tps = 152517.663404 (including connections establishing)
tps = 153319.188482 (excluding connections establishing)
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
Password:
Password:
starting vacuum...end.
transaction type: <builtin: select only>
scaling factor: 1

View File

@@ -82,6 +82,7 @@ primary_reads_enabled = true
# sha1: A hashing function based on SHA1
#
sharding_function = "pg_bigint_hash"
sharding_key = "id"
# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]

View File

@@ -59,6 +59,7 @@ pub struct Client<S, T> {
client_server_map: ClientServerMap,
/// Client parameters, e.g. user, client_encoding, etc.
#[allow(dead_code)]
parameters: HashMap<String, String>,
/// Statistics
@@ -82,6 +83,9 @@ pub struct Client<S, T> {
/// Postgres user for this client (This comes from the user in the connection string)
username: String,
/// Application name for this client (defaults to pgcat)
application_name: String,
/// Used to notify clients about an impending shutdown
shutdown: Receiver<()>,
}
@@ -91,7 +95,7 @@ pub async fn client_entrypoint(
mut stream: TcpStream,
client_server_map: ClientServerMap,
shutdown: Receiver<()>,
drain: Sender<i8>,
drain: Sender<i32>,
admin_only: bool,
) -> Result<(), Error> {
// Figure out if the client wants TLS or not.
@@ -200,7 +204,7 @@ pub async fn client_entrypoint(
Ok(mut client) => {
info!("Client {:?} connected (plain)", addr);
if client.is_admin() {
if !client.is_admin() {
let _ = drain.send(1).await;
}
@@ -225,7 +229,7 @@ pub async fn client_entrypoint(
Ok(mut client) => {
info!("Client {:?} issued a cancel query request", addr);
if client.is_admin() {
if !client.is_admin() {
let _ = drain.send(1).await;
}
@@ -365,6 +369,11 @@ where
None => return Err(Error::ClientError),
};
let application_name = match parameters.get("application_name") {
Some(application_name) => application_name,
None => "pgcat",
};
let admin = ["pgcat", "pgbouncer"]
.iter()
.filter(|db| *db == &pool_name)
@@ -493,6 +502,7 @@ where
last_server_id: None,
pool_name: pool_name.clone(),
username: username.clone(),
application_name: application_name.to_string(),
shutdown,
connected_to_server: false,
});
@@ -526,6 +536,7 @@ where
last_server_id: None,
pool_name: String::from("undefined"),
username: String::from("undefined"),
application_name: String::from("undefined"),
shutdown,
connected_to_server: false,
});
@@ -600,10 +611,23 @@ where
message_result = read_message(&mut self.read) => message_result?
};
// Avoid taking a server if the client just wants to disconnect.
if message[0] as char == 'X' {
debug!("Client disconnecting");
return Ok(());
match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
// we'll be able to allocate a connection. Also, clients do not expect
// the server to respond to these messages so even if we were not able to
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'P' | 'B' | 'D' | 'E' => {
self.buffer.put(&message[..]);
continue;
}
'X' => {
debug!("Client disconnecting");
return Ok(());
}
_ => (),
}
// Handle admin database queries.
@@ -638,7 +662,7 @@ where
// Normal query, not a custom command.
None => {
if query_router.query_parser_enabled() {
query_router.infer_role(message.clone());
query_router.infer_role_and_shard(message.clone());
}
}
@@ -714,22 +738,19 @@ where
conn
}
Err(err) => {
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
// of extended protocol submission. So we will hold off on sending the actual error
// message to the client until we get 'S' message
match message[0] as char {
'P' | 'B' | 'E' | 'D' => (),
_ => {
error_response(
&mut self.write,
"could not get connection from the pool",
)
.await?;
}
};
error!("Could not get connection from pool: {:?}", err);
// Client is attempting to get results from the server,
// but we were unable to grab a connection from the pool
// We'll send back an error message and clean the extended
// protocol buffer
if message[0] as char == 'S' {
error!("Got Sync message but failed to get a connection from the pool");
self.buffer.clear();
}
error_response(&mut self.write, "could not get connection from the pool")
.await?;
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
continue;
}
};
@@ -759,13 +780,10 @@ where
server.address()
);
// Set application_name if any.
// TODO: investigate other parameters and set them too.
if self.parameters.contains_key("application_name") {
server
.set_name(&self.parameters["application_name"])
.await?;
}
// Set application_name.
server.set_name(&self.application_name).await?;
// Transaction loop. Multiple queries can be issued by the client here.
// The connection belongs to the client until the transaction is over,
@@ -782,12 +800,7 @@ where
Err(err) => {
// Client disconnected inside a transaction.
// Clean up the server and re-use it.
// This prevents connection thrashing by bad clients.
if server.in_transaction() {
server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
}
server.checkin_cleanup().await?;
return Err(err);
}
@@ -829,16 +842,7 @@ where
// Terminate
'X' => {
// Client closing. Rollback and clean up
// connection before releasing into the pool.
// Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave.
if server.in_transaction() {
server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?;
}
server.checkin_cleanup().await?;
self.release();
return Ok(());
@@ -875,6 +879,23 @@ where
self.buffer.put(&original[..]);
// Clone after freeze does not allocate
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
// Almost certainly true
if first_message_code == 'P' {
// Message layout
// P followed by 32 int followed by null-terminated statement name
// So message code should be in offset 0 of the buffer, first character
// in prepared statement name would be index 5
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
if first_char_in_name != 0 {
// This is a named prepared statement
// Server connection state will need to be cleared at checkin
server.mark_dirty();
}
}
self.send_and_receive_loop(
code,
self.buffer.clone(),
@@ -942,8 +963,10 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
self.stats.server_idle(server.process_id(), address.id);
self.connected_to_server = false;
self.release();
self.stats.client_idle(self.process_id, address.id);
}

View File

@@ -185,6 +185,7 @@ pub struct Pool {
pub query_parser_enabled: bool,
pub primary_reads_enabled: bool,
pub sharding_function: String,
pub sharding_key: Option<String>,
pub shards: HashMap<String, Shard>,
pub users: HashMap<String, User>,
}
@@ -198,6 +199,7 @@ impl Default for Pool {
query_parser_enabled: false,
primary_reads_enabled: true,
sharding_function: "pg_bigint_hash".to_string(),
sharding_key: None,
}
}
}

View File

@@ -74,7 +74,8 @@ use crate::stats::{Collector, Reporter, REPORTER};
#[tokio::main(worker_threads = 4)]
async fn main() {
env_logger::init();
env_logger::builder().format_timestamp_micros().init();
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
if !query_router::QueryRouter::setup() {
@@ -154,12 +155,31 @@ async fn main() {
info!("Config autoreloader: {}", config.general.autoreload);
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
let autoreload_client_server_map = client_server_map.clone();
tokio::task::spawn(async move {
loop {
autoreload_interval.tick().await;
if config.general.autoreload {
info!("Automatically reloading config");
match reload_config(autoreload_client_server_map.clone()).await {
Ok(changed) => {
if changed {
get_config().show()
}
}
Err(_) => (),
};
}
}
});
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let (drain_tx, mut drain_rx) = mpsc::channel::<i8>(2048);
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
info!("Waiting for clients");
@@ -182,21 +202,6 @@ async fn main() {
get_config().show();
},
_ = autoreload_interval.tick() => {
if config.general.autoreload {
info!("Automatically reloading config");
match reload_config(client_server_map.clone()).await {
Ok(changed) => {
if changed {
get_config().show()
}
}
Err(_) => (),
};
}
},
// Initiate graceful shutdown sequence on sig int
_ = interrupt_signal.recv() => {
info!("Got SIGINT, waiting for client connection drain now");
@@ -217,13 +222,16 @@ async fn main() {
interval.tick().await;
// We're done waiting.
error!("Timed out waiting for clients");
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
let _ = exit_tx.send(()).await;
});
},
_ = term_signal.recv() => break,
_ = term_signal.recv() => {
info!("Got SIGTERM, closing with {} clients active", total_clients);
break;
},
new_client = listener.accept() => {
let (socket, addr) = match new_client {
@@ -300,34 +308,18 @@ async fn main() {
///
/// * `duration` - A duration of time
fn format_duration(duration: &chrono::Duration) -> String {
let seconds = {
let seconds = duration.num_seconds() % 60;
if seconds < 10 {
format!("0{}", seconds)
} else {
format!("{}", seconds)
}
};
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
let minutes = {
let minutes = duration.num_minutes() % 60;
if minutes < 10 {
format!("0{}", minutes)
} else {
format!("{}", minutes)
}
};
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
let hours = {
let hours = duration.num_hours() % 24;
if hours < 10 {
format!("0{}", hours)
} else {
format!("{}", hours)
}
};
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
let hours = format!("{:0>2}", duration.num_hours() % 24);
let days = duration.num_days().to_string();
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
format!(
"{}d {}:{}:{}.{}",
days, hours, minutes, seconds, milliseconds
)
}

View File

@@ -8,6 +8,7 @@ use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
use rand::seq::SliceRandom;
use rand::thread_rng;
use regex::Regex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
@@ -68,6 +69,9 @@ pub struct PoolSettings {
// Sharding function.
pub sharding_function: ShardingFunction,
// Automatically detect sharding key in query.
pub sharding_key_regex: Option<Regex>,
}
impl Default for PoolSettings {
@@ -80,6 +84,7 @@ impl Default for PoolSettings {
query_parser_enabled: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
sharding_key_regex: None,
}
}
}
@@ -229,6 +234,20 @@ impl ConnectionPool {
"sha1" => ShardingFunction::Sha1,
_ => unreachable!(),
},
sharding_key_regex: match &pool_config.sharding_key {
Some(sharding_key) => match Regex::new(&format!(
r"(?i) *{} *= *'?([0-9]+)'?",
sharding_key
)) {
Ok(regex) => Some(regex),
Err(err) => {
error!("Sharding key regex error: {:?}", err);
return Err(Error::BadConfig);
}
},
None => None,
},
},
};
@@ -559,11 +578,7 @@ impl ManageConnection for ServerPool {
/// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
info!(
"Creating a new connection to {:?} using user {:?}",
self.address.name(),
self.user.username
);
info!("Creating a new server connection {:?}", self.address);
// Put a temporary process_id into the stats
// for server login.

View File

@@ -55,6 +55,8 @@ pub struct QueryRouter {
/// Include the primary into the replica pool for reads.
primary_reads_enabled: bool,
set_manually: bool,
/// Pool configuration.
pool_settings: PoolSettings,
}
@@ -97,6 +99,7 @@ impl QueryRouter {
active_role: None,
query_parser_enabled: false,
primary_reads_enabled: false,
set_manually: false,
pool_settings: PoolSettings::default(),
}
}
@@ -104,6 +107,11 @@ impl QueryRouter {
/// Pool settings can change because of a config reload.
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
self.pool_settings = pool_settings;
if !self.set_manually {
self.query_parser_enabled = self.pool_settings.query_parser_enabled;
self.primary_reads_enabled = self.pool_settings.primary_reads_enabled;
}
}
/// Try to parse a command and execute it.
@@ -205,6 +213,8 @@ impl QueryRouter {
}
Command::SetServerRole => {
self.set_manually = true;
self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => {
self.query_parser_enabled = false;
@@ -228,7 +238,7 @@ impl QueryRouter {
"default" => {
self.active_role = self.pool_settings.default_role;
self.query_parser_enabled = self.query_parser_enabled;
self.query_parser_enabled = self.pool_settings.query_parser_enabled;
self.active_role
}
@@ -237,6 +247,8 @@ impl QueryRouter {
}
Command::SetPrimaryReads => {
self.set_manually = true;
if value == "on" {
debug!("Setting primary reads to on");
self.primary_reads_enabled = true;
@@ -256,7 +268,7 @@ impl QueryRouter {
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool {
pub fn infer_role_and_shard(&mut self, mut buf: BytesMut) -> bool {
debug!("Inferring role");
let code = buf.get_u8() as char;
@@ -297,6 +309,31 @@ impl QueryRouter {
_ => return false,
};
// First find the shard key
match &self.pool_settings.sharding_key_regex {
Some(re) => {
match re.captures(&query) {
Some(group) => match group.get(1) {
Some(value) => {
let value = value.as_str().parse::<i64>().unwrap();
let sharder = Sharder::new(
self.pool_settings.shards,
self.pool_settings.sharding_function,
);
let shard = sharder.shard(value);
self.active_shard = Some(shard);
debug!("Automatically routing to shard {}", shard);
}
None => (),
},
None => (),
};
}
None => (),
};
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast,
Err(err) => {
@@ -373,7 +410,7 @@ mod test {
}
#[test]
fn test_infer_role_replica() {
fn test_infer_role_and_shard_replica() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
@@ -382,22 +419,25 @@ mod test {
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO off")) != None);
let queries = vec![
simple_query("SELECT * FROM items WHERE id = 5"),
simple_query("SELECT * FROM items WHERE id = 4"),
simple_query(
"SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id",
),
simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"),
];
for query in queries {
let shards = vec![0, 0, 0];
for (idx, query) in queries.iter().enumerate() {
// It's a recognized query
assert!(qr.infer_role(query));
assert!(qr.infer_role_and_shard(query.clone()));
assert_eq!(qr.role(), Some(Role::Replica));
assert_eq!(qr.shard(), shards[idx]);
}
}
#[test]
fn test_infer_role_primary() {
fn test_infer_role_and_shard_primary() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
@@ -410,24 +450,24 @@ mod test {
for query in queries {
// It's a recognized query
assert!(qr.infer_role(query));
assert!(qr.infer_role_and_shard(query));
assert_eq!(qr.role(), Some(Role::Primary));
}
}
#[test]
fn test_infer_role_primary_reads_enabled() {
fn test_infer_role_and_shard_primary_reads_enabled() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);
assert!(qr.infer_role(query));
assert!(qr.infer_role_and_shard(query));
assert_eq!(qr.role(), None);
}
#[test]
fn test_infer_role_parse_prepared() {
fn test_infer_role_and_shard_parse_prepared() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
@@ -442,7 +482,7 @@ mod test {
res.put(prepared_stmt);
res.put_i16(0);
assert!(qr.infer_role(res));
assert!(qr.infer_role_and_shard(res));
assert_eq!(qr.role(), Some(Role::Replica));
}
@@ -606,17 +646,17 @@ mod test {
assert_eq!(qr.role(), None);
let query = simple_query("INSERT INTO test_table VALUES (1)");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.infer_role_and_shard(query), true);
assert_eq!(qr.role(), Some(Role::Primary));
let query = simple_query("SELECT * FROM test_table");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.infer_role_and_shard(query), true);
assert_eq!(qr.role(), Some(Role::Replica));
assert!(qr.query_parser_enabled());
let query = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(query) != None);
assert!(qr.query_parser_enabled());
assert!(!qr.query_parser_enabled());
}
#[test]
@@ -629,7 +669,8 @@ mod test {
user: crate::config::User::default(),
default_role: Some(Role::Replica),
query_parser_enabled: true,
primary_reads_enabled: false,
primary_reads_enabled: true,
sharding_key_regex: None,
sharding_function: ShardingFunction::PgBigintHash,
};
let mut qr = QueryRouter::new();
@@ -643,8 +684,8 @@ mod test {
assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false);
assert_eq!(qr.primary_reads_enabled, false);
assert_eq!(qr.query_parser_enabled, true);
assert_eq!(qr.primary_reads_enabled, true);
let q1 = simple_query("SET SERVER ROLE TO 'primary'");
assert!(qr.try_execute_command(q1) != None);

View File

@@ -1,7 +1,8 @@
/// Implementation of the PostgreSQL server (database) protocol.
/// Here we are pretending to the a Postgres client.
use bytes::{Buf, BufMut, BytesMut};
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use std::io::Read;
use std::time::SystemTime;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{
@@ -48,6 +49,9 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
/// If server connection requires a DISCARD ALL before checkin
needs_cleanup: bool,
/// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap,
@@ -316,6 +320,7 @@ impl Server {
in_transaction: false,
data_available: false,
bad: false,
needs_cleanup: false,
client_server_map: client_server_map,
connected_at: chrono::offset::Utc::now().naive_utc(),
stats: stats,
@@ -440,6 +445,29 @@ impl Server {
break;
}
// CommandComplete
'C' => {
let mut command_tag = String::new();
match message.reader().read_to_string(&mut command_tag) {
Ok(_) => {
// Non-exhaustive list of commands that are likely to change session variables/resources
// which can leak between clients. This is a best effort to block bad clients
// from poisoning a transaction-mode pool by setting inappropriate session variables
match command_tag.as_str() {
"SET\0" | "PREPARE\0" => {
debug!("Server connection marked for clean up");
self.needs_cleanup = true;
}
_ => (),
}
}
Err(err) => {
warn!("Encountered an error while parsing CommandTag {}", err);
}
}
}
// DataRow
'D' => {
// More data is available after this message, this is not the end of the reply.
@@ -553,14 +581,43 @@ impl Server {
Ok(())
}
/// Perform any necessary cleanup before putting the server
/// connection back in the pool
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
// Client disconnected with an open transaction on the server connection.
// Pgbouncer behavior is to close the server connection but that can cause
// server connection thrashing if clients repeatedly do this.
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
if self.in_transaction() {
self.query("ROLLBACK").await?;
}
// Client disconnected but it perfromed session-altering operations such as
// SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.needs_cleanup {
self.query("DISCARD ALL").await?;
self.needs_cleanup = false;
}
return Ok(());
}
/// A shorthand for `SET application_name = $1`.
#[allow(dead_code)]
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
if self.application_name != name {
self.application_name = name.to_string();
Ok(self
// We don't want `SET application_name` to mark the server connection
// as needing cleanup
let needs_cleanup_before = self.needs_cleanup;
let result = Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?)
.await?);
self.needs_cleanup = needs_cleanup_before;
return result;
} else {
Ok(())
}
@@ -581,6 +638,11 @@ impl Server {
pub fn last_activity(&self) -> SystemTime {
self.last_activity
}
// Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty(&mut self) {
self.needs_cleanup = true;
}
}
impl Drop for Server {
@@ -607,7 +669,8 @@ impl Drop for Server {
let duration = now - self.connected_at;
info!(
"Server connection closed, session duration: {}",
"Server connection closed {:?}, session duration: {}",
self.address,
crate::format_duration(&duration)
);
}

5
tests/docker/Dockerfile Normal file
View File

@@ -0,0 +1,5 @@
FROM rust:bullseye
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
RUN cargo install cargo-binutils rustfilt
RUN rustup component add llvm-tools-preview

View File

@@ -0,0 +1,47 @@
version: "3"
services:
pg1:
image: postgres:14
network_mode: "service:main"
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
pg2:
image: postgres:14
network_mode: "service:main"
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
pg3:
image: postgres:14
network_mode: "service:main"
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
pg4:
image: postgres:14
network_mode: "service:main"
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
main:
build: .
command: ["bash", "/app/tests/docker/run.sh"]
environment:
RUSTFLAGS: "-C instrument-coverage"
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
volumes:
- ../../:/app/
- /app/target/

21
tests/docker/run.sh Normal file
View File

@@ -0,0 +1,21 @@
#!/bin/bash
rm /app/*.profraw || true
rm /app/pgcat.profdata || true
rm -rf /app/cov || true
cd /app/
cargo build
cargo test --tests
bash .circleci/run_tests.sh
rust-profdata merge -sparse pgcat-*.profraw -o pgcat.profdata
rust-cov export -ignore-filename-regex="rustc|registry" -Xdemangler=rustfilt -instr-profile=pgcat.profdata --object ./target/debug/pgcat --format lcov > ./lcov.info
genhtml lcov.info --output-directory cov --prefix $(pwd)
rm /app/*.profraw
rm /app/pgcat.profdata

View File

@@ -18,9 +18,14 @@ def pgcat_start():
def pg_cat_send_signal(signal: signal.Signals):
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
try:
for proc in psutil.process_iter(["pid", "name"]):
if "pgcat" == proc.name():
os.kill(proc.pid, signal)
except Exception as e:
# The process can be gone when we send this signal
print(e)
if signal == signal.SIGTERM:
# Returns 0 if pgcat process exists
time.sleep(2)

View File

@@ -1,6 +1,8 @@
source "https://rubygems.org"
gem "pg"
gem "activerecord"
gem "toml"
gem "rspec"
gem "rubocop"
gem "toml", "~> 0.3.0"
gem "toxiproxy"
gem "activerecord"

View File

@@ -13,6 +13,7 @@ GEM
tzinfo (~> 2.0)
ast (2.4.2)
concurrent-ruby (1.1.10)
diff-lcs (1.5.0)
i18n (1.11.0)
concurrent-ruby (~> 1.0)
minitest (5.16.2)
@@ -24,6 +25,19 @@ GEM
rainbow (3.1.1)
regexp_parser (2.3.1)
rexml (3.2.5)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
rspec-mocks (~> 3.11.0)
rspec-core (3.11.0)
rspec-support (~> 3.11.0)
rspec-expectations (3.11.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.11.0)
rspec-mocks (3.11.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.11.0)
rspec-support (3.11.0)
rubocop (1.29.0)
parallel (~> 1.10)
parser (>= 3.1.0.0)
@@ -38,19 +52,23 @@ GEM
ruby-progressbar (1.11.0)
toml (0.3.0)
parslet (>= 1.8.0, < 3.0.0)
toxiproxy (2.0.1)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
unicode-display_width (2.1.0)
PLATFORMS
aarch64-linux
arm64-darwin-21
x86_64-linux
DEPENDENCIES
activerecord
pg
rspec
rubocop
toml (~> 0.3.0)
toml
toxiproxy
BUNDLED WITH
2.3.7
2.3.21

View File

@@ -0,0 +1,82 @@
require 'pg'
require 'toxiproxy'
class PgInstance
attr_reader :port
attr_reader :username
attr_reader :password
attr_reader :database_name
def initialize(port, username, password, database_name)
@original_port = port
@toxiproxy_port = 10000 + port.to_i
@port = @toxiproxy_port
@username = username
@password = password
@database_name = database_name
@toxiproxy_name = "database_#{@original_port}"
Toxiproxy.populate([{
name: @toxiproxy_name,
listen: "0.0.0.0:#{@toxiproxy_port}",
upstream: "localhost:#{@original_port}",
}])
# Toxiproxy server will outlive our PgInstance objects
# so we want to destroy our proxies before exiting
# Ruby finalizer is ideal for doing this
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
end
def with_connection
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
yield conn
ensure
conn&.close
end
def reset
reset_toxics
reset_stats
end
def toxiproxy
Toxiproxy[@toxiproxy_name]
end
def take_down
if block_given?
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
else
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
end
end
def add_latency(latency)
if block_given?
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
else
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
end
end
def delete_proxy
Toxiproxy[@toxiproxy_name].delete
end
def reset_toxics
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
end
def reset_stats
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
end
def count_query(query)
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
end
def count_select_1_plus_2
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
end
end

View File

@@ -0,0 +1,100 @@
require 'json'
require 'ostruct'
require_relative 'pgcat_process'
require_relative 'pg_instance'
module Helpers
module Pgcat
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
"statement_timeout" => 0,
"username" => "sharding_user"
}
pgcat = PgcatProcess.new("info")
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
pgcat_cfg = pgcat.current_config
pgcat_cfg["pools"] = {
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
},
"users" => { "0" => user }
}
}
pgcat.update_config(pgcat_cfg)
pgcat.start
pgcat.wait_until_ready
OpenStruct.new.tap do |struct|
struct.pgcat = pgcat
struct.shards = [primary0, primary1, primary2]
struct.all_databases = [primary0, primary1, primary2]
end
end
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
"statement_timeout" => 0,
"username" => "sharding_user"
}
pgcat = PgcatProcess.new("info")
pgcat_cfg = pgcat.current_config
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
# Main proxy configs
pgcat_cfg["pools"] = {
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
"shards" => {
"0" => {
"database" => "shard0",
"servers" => [
["localhost", primary.port.to_s, "primary"],
["localhost", replica0.port.to_s, "replica"],
["localhost", replica1.port.to_s, "replica"],
["localhost", replica2.port.to_s, "replica"]
]
},
},
"users" => { "0" => user }
}
}
pgcat_cfg["general"]["port"] = pgcat.port
pgcat.update_config(pgcat_cfg)
pgcat.start
pgcat.wait_until_ready
OpenStruct.new.tap do |struct|
struct.pgcat = pgcat
struct.primary = primary
struct.replicas = [replica0, replica1, replica2]
struct.all_databases = [primary, replica0, replica1, replica2]
end
end
end
end

View File

@@ -0,0 +1,116 @@
require 'pg'
require 'toml'
require 'fileutils'
require 'securerandom'
class PgcatProcess
attr_reader :port
attr_reader :pid
def self.finalize(pid, log_filename, config_filename)
`kill #{pid}`
File.delete(config_filename) if File.exist?(config_filename)
File.delete(log_filename) if File.exist?(log_filename)
end
def initialize(log_level)
@env = {"RUST_LOG" => log_level}
@port = rand(20000..32760)
@log_level = log_level
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
@config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml"
@command = "../../target/debug/pgcat #{@config_filename}"
FileUtils.cp("../../pgcat.toml", @config_filename)
cfg = current_config
cfg["general"]["port"] = @port.to_i
cfg["general"]["enable_prometheus_exporter"] = false
update_config(cfg)
end
def logs
File.read(@log_filename)
end
def update_config(config_hash)
@original_config = current_config
output_to_write = TOML::Generator.new(config_hash).body
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
File.write(@config_filename, output_to_write)
end
def current_config
old_cfg = File.read(@config_filename)
loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",')
TOML.load(loadable_string)
end
def reload_config
`kill -s HUP #{@pid}`
sleep 0.1
end
def start
raise StandardError, "Process is already started" unless @pid.nil?
@pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename)
ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) })
return self
end
def wait_until_ready
exc = nil
10.times do
PG::connect(example_connection_string).close
return self
rescue => e
exc = e
sleep(0.5)
end
puts exc
raise StandardError, "Process #{@pid} never became ready. Logs #{logs}"
end
def stop
`kill #{@pid}`
sleep 0.1
end
def shutdown
stop
File.delete(@config_filename) if File.exist?(@config_filename)
File.delete(@log_filename) if File.exist?(@log_filename)
end
def admin_connection_string
cfg = current_config
username = cfg["general"]["admin_username"]
password = cfg["general"]["admin_password"]
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
end
def connection_string(pool_name, username)
cfg = current_config
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
password = user_obj["password"]
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
end
def example_connection_string
cfg = current_config
first_pool_name = cfg["pools"].keys[0]
db_name = first_pool_name
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
end
end

View File

@@ -0,0 +1,61 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
context "under regular circumstances" do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
query_count = QUERY_COUNT
expected_share = query_count / processes.all_databases.count
failed_count = 0
query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end
expect(failed_count).to eq(0)
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end
context "when some replicas are down" do
it "balances query volume between working instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
failed_count = 0
processes[:replicas][0].take_down do
processes[:replicas][1].take_down do
QUERY_COUNT.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
end
end
expect(failed_count).to eq(2)
processes.all_databases.each do |instance|
queries_routed = instance.count_select_1_plus_2
if processes.replicas[0..1].include?(instance)
expect(queries_routed).to eq(0)
else
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end
end
end

193
tests/ruby/misc_spec.rb Normal file
View File

@@ -0,0 +1,193 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "Miscellaneous" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "Extended Protocol handling" do
it "does not send packets that client does not expect during extended protocol sequence" do
new_configs = processes.pgcat.current_config
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
processes.pgcat.update_config(new_configs)
processes.pgcat.reload_config
25.times do
Thread.new do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
ensure
conn&.close
end
end
sleep(0.5)
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
stdout, stderr = with_captured_stdout_stderr do
15.times do |i|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
sleep 1
end
end
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
end
end
describe "Pool recycling after config reload" do
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
it "should update pools for new clients and clients that are no longer in transaction" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
server_conn.async_exec("BEGIN")
# No config change yet, client should set old configs
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
expect(current_datebase_from_pg).to eq('shard0')
# Swap shards
new_config = processes.pgcat.current_config
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
# Reload config
processes.pgcat.update_config(new_config)
processes.pgcat.reload_config
sleep 0.5
# Config changed but transaction is in progress, client should set old configs
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
expect(current_datebase_from_pg).to eq('shard0')
server_conn.async_exec("COMMIT")
# Transaction finished, client should get new configs
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
expect(current_datebase_from_pg).to eq('shard1')
# New connection should get new configs
server_conn.close()
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
expect(current_datebase_from_pg).to eq('shard1')
end
end
describe "Clients closing connection in the middle of transaction" do
it "sends a rollback to the server" do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("BEGIN")
conn.close
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
end
end
describe "Server version reporting" do
it "reports correct version for normal and admin databases" do
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expect(server_conn.server_version).not_to eq(0)
server_conn.close
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect(admin_conn.server_version).not_to eq(0)
admin_conn.close
end
end
describe "State clearance" do
context "session mode" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
it "Clears state before connection checkin" do
# Both modes of operation should not raise
# ERROR: prepared statement "prepared_q" already exists
15.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
conn.close
end
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
conn.async_exec("SET statement_timeout to 1000")
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
conn.close
end
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.async_exec("SET statement_timeout to 5000")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
end
context "transaction mode" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
it "Clears state before connection checkin" do
# Both modes of operation should not raise
# ERROR: prepared statement "prepared_q" already exists
15.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
conn.close
end
15.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.prepare("prepared_q", "SELECT $1")
conn.close
end
end
it "Does not send DISCARD ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.exec_params("SELECT $1", [1])
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.async_exec("SET statement_timeout to 5000")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
end
end
end
end

View File

@@ -0,0 +1,81 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "Routing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "SET ROLE" do
context "primary" do
it "routes queries only to primary" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
query_count = 30
failed_count = 0
query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end
expect(failed_count).to eq(0)
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to eq(0)
end
expect(processes.primary.count_select_1_plus_2).to eq(query_count)
end
end
context "replica" do
it "routes queries only to replicas" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'replica'")
expected_share = QUERY_COUNT / processes.replicas.count
failed_count = 0
QUERY_COUNT.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end
expect(failed_count).to eq(0)
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
expect(processes.primary.count_select_1_plus_2).to eq(0)
end
end
context "any" do
it "routes queries to all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'any'")
expected_share = QUERY_COUNT / processes.all_databases.count
failed_count = 0
QUERY_COUNT.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end
expect(failed_count).to eq(0)
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end
end
end

21
tests/ruby/spec_helper.rb Normal file
View File

@@ -0,0 +1,21 @@
# frozen_string_literal: true
require 'pg'
require_relative 'helpers/pgcat_helper'
QUERY_COUNT = 300
MARGIN_OF_ERROR = 0.30
def with_captured_stdout_stderr
sout = STDOUT.clone
serr = STDERR.clone
STDOUT.reopen("/tmp/out.txt", "w+")
STDERR.reopen("/tmp/err.txt", "w+")
STDOUT.sync = true
STDERR.sync = true
yield
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
ensure
STDOUT.reopen(sout)
STDERR.reopen(serr)
end

View File

@@ -1,93 +1,6 @@
# frozen_string_literal: true
require 'active_record'
require 'pg'
require 'toml'
$stdout.sync = true
$stderr.sync = true
class ConfigEditor
def initialize
@original_config_text = File.read('../../.circleci/pgcat.toml')
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
@original_configs = TOML.load(text_to_load)
end
def original_configs
TOML.load(TOML::Generator.new(@original_configs).body)
end
def with_modified_configs(new_configs)
text_to_write = TOML::Generator.new(new_configs).body
text_to_write = text_to_write.gsub("\"5432\"", "5432")
File.write('../../.circleci/pgcat.toml', text_to_write)
yield
ensure
File.write('../../.circleci/pgcat.toml', @original_config_text)
end
end
def with_captured_stdout_stderr
sout = STDOUT.clone
serr = STDERR.clone
STDOUT.reopen("/tmp/out.txt", "w+")
STDERR.reopen("/tmp/err.txt", "w+")
STDOUT.sync = true
STDERR.sync = true
yield
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
ensure
STDOUT.reopen(sout)
STDERR.reopen(serr)
end
def test_extended_protocol_pooler_errors
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
conf_editor = ConfigEditor.new
new_configs = conf_editor.original_configs
# shorter timeouts
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
10.times do
Thread.new do
conn = PG::connect(conn_str)
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
ensure
conn&.close
end
end
sleep(0.5)
conn_under_test = PG::connect(conn_str)
stdout, stderr = with_captured_stdout_stderr do
5.times do |i|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
sleep 1
end
end
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
puts "Pool checkout errors not breaking clients passed"
ensure
sleep 1
admin_conn.async_exec("RELOAD") # Reset state
conn_under_test&.close
end
test_extended_protocol_pooler_errors
require 'active_record'
# Uncomment these two to see all queries.
# ActiveRecord.verbose_query_logs = true
@@ -198,68 +111,3 @@ begin
rescue ActiveRecord::StatementInvalid
puts 'OK'
end
# Test evil clients
def poorly_behaved_client
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
conn.async_exec 'BEGIN'
conn.async_exec 'SELECT 1'
conn.close
puts 'Bad client ok'
end
25.times do
poorly_behaved_client
end
def test_server_parameters
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
raise StandardError, "Bad server version" if server_conn.server_version == 0
server_conn.close
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
raise StandardError, "Bad server version" if admin_conn.server_version == 0
admin_conn.close
puts 'Server parameters ok'
end
def test_reload_pool_recycling
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
server_conn.async_exec("BEGIN")
conf_editor = ConfigEditor.new
new_configs = conf_editor.original_configs
# swap shards
new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1"
new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0"
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
server_conn.async_exec("COMMIT;")
# Transaction finished, client should get new configs
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
server_conn.close()
# New connection should get new configs
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
ensure
admin_conn.async_exec("RELOAD") # Go back to old state
admin_conn.close
server_conn.close
puts "Pool Recycling okay!"
end
test_reload_pool_recycling

View File

@@ -70,23 +70,35 @@ GRANT CONNECT ON DATABASE shard2 TO other_user;
GRANT CONNECT ON DATABASE some_db TO simple_user;
\c shard0
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
GRANT ALL ON SCHEMA public TO sharding_user;
GRANT ALL ON TABLE data TO sharding_user;
GRANT ALL ON SCHEMA public TO other_user;
GRANT ALL ON TABLE data TO other_user;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
\c shard1
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
GRANT ALL ON SCHEMA public TO sharding_user;
GRANT ALL ON TABLE data TO sharding_user;
GRANT ALL ON SCHEMA public TO other_user;
GRANT ALL ON TABLE data TO other_user;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
\c shard2
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
GRANT ALL ON SCHEMA public TO sharding_user;
GRANT ALL ON TABLE data TO sharding_user;
GRANT ALL ON SCHEMA public TO other_user;
GRANT ALL ON TABLE data TO other_user;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
\c some_db
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user;
GRANT ALL ON SCHEMA public TO simple_user;
GRANT ALL ON TABLE data TO simple_user;