mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
2 Commits
levkk-auto
...
levkk-sear
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5872354c3e | ||
|
|
48bb6ebeef |
@@ -15,34 +15,14 @@ jobs:
|
||||
RUSTFLAGS: "-C instrument-coverage"
|
||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
# auth:
|
||||
# username: mydockerhub-user
|
||||
# password: $DOCKERHUB_PASSWORD
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
|
||||
# Add steps to the job
|
||||
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
||||
steps:
|
||||
|
||||
@@ -108,6 +108,7 @@ servers = [
|
||||
]
|
||||
# Database name (e.g. "postgres")
|
||||
database = "shard0"
|
||||
search_path = "\"$user\",public"
|
||||
|
||||
[pools.sharded_db.shards.1]
|
||||
servers = [
|
||||
|
||||
@@ -16,9 +16,6 @@ function start_pgcat() {
|
||||
|
||||
# Setup the database with shards and user
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
@@ -29,7 +26,7 @@ wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/downlo
|
||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||
|
||||
# Start Toxiproxy
|
||||
LOG_LEVEL=error toxiproxy-server &
|
||||
toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
# Create a database at port 5433, forward it to Postgres
|
||||
@@ -90,8 +87,7 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
|
||||
cd tests/ruby
|
||||
sudo gem install bundler
|
||||
bundle install
|
||||
bundle exec ruby tests.rb || exit 1
|
||||
bundle exec rspec *_spec.rb || exit 1
|
||||
ruby tests.rb
|
||||
cd ../..
|
||||
|
||||
#
|
||||
@@ -99,7 +95,7 @@ cd ../..
|
||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||
#
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
python3 tests/python/tests.py || exit 1
|
||||
python3 tests/python/tests.py
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
@@ -109,9 +105,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||
export PGPASSWORD=sharding_user
|
||||
|
||||
25
README.md
25
README.md
@@ -1,11 +1,8 @@
|
||||

|
||||
|
||||
##### PgCat: PostgreSQL at petabyte scale
|
||||
# PgCat
|
||||
|
||||
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
||||
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
|
||||
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
|
||||
</a>
|
||||
|
||||

|
||||
|
||||
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
||||
|
||||
@@ -90,14 +87,6 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
|
||||
|
||||
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||
|
||||
Run `cargo test` to run Rust tests.
|
||||
|
||||
Run the following commands to run Integration tests locally.
|
||||
```
|
||||
cd tests/docker/
|
||||
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
|
||||
```
|
||||
|
||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||
@@ -458,7 +447,7 @@ Always good to have a base line.
|
||||
|
||||
```
|
||||
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -472,7 +461,7 @@ tps = 139443.955722 (including connections establishing)
|
||||
tps = 142314.859075 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -486,7 +475,7 @@ tps = 150644.840891 (including connections establishing)
|
||||
tps = 152218.499430 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -500,7 +489,7 @@ tps = 152517.663404 (including connections establishing)
|
||||
tps = 153319.188482 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
|
||||
@@ -82,7 +82,6 @@ primary_reads_enabled = true
|
||||
# sha1: A hashing function based on SHA1
|
||||
#
|
||||
sharding_function = "pg_bigint_hash"
|
||||
sharding_key = "id"
|
||||
|
||||
# Credentials for users that may connect to this cluster
|
||||
[pools.sharded_db.users.0]
|
||||
|
||||
119
src/client.rs
119
src/client.rs
@@ -59,7 +59,6 @@ pub struct Client<S, T> {
|
||||
client_server_map: ClientServerMap,
|
||||
|
||||
/// Client parameters, e.g. user, client_encoding, etc.
|
||||
#[allow(dead_code)]
|
||||
parameters: HashMap<String, String>,
|
||||
|
||||
/// Statistics
|
||||
@@ -83,9 +82,6 @@ pub struct Client<S, T> {
|
||||
/// Postgres user for this client (This comes from the user in the connection string)
|
||||
username: String,
|
||||
|
||||
/// Application name for this client (defaults to pgcat)
|
||||
application_name: String,
|
||||
|
||||
/// Used to notify clients about an impending shutdown
|
||||
shutdown: Receiver<()>,
|
||||
}
|
||||
@@ -95,7 +91,7 @@ pub async fn client_entrypoint(
|
||||
mut stream: TcpStream,
|
||||
client_server_map: ClientServerMap,
|
||||
shutdown: Receiver<()>,
|
||||
drain: Sender<i32>,
|
||||
drain: Sender<i8>,
|
||||
admin_only: bool,
|
||||
) -> Result<(), Error> {
|
||||
// Figure out if the client wants TLS or not.
|
||||
@@ -204,7 +200,7 @@ pub async fn client_entrypoint(
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} connected (plain)", addr);
|
||||
|
||||
if !client.is_admin() {
|
||||
if client.is_admin() {
|
||||
let _ = drain.send(1).await;
|
||||
}
|
||||
|
||||
@@ -229,7 +225,7 @@ pub async fn client_entrypoint(
|
||||
Ok(mut client) => {
|
||||
info!("Client {:?} issued a cancel query request", addr);
|
||||
|
||||
if !client.is_admin() {
|
||||
if client.is_admin() {
|
||||
let _ = drain.send(1).await;
|
||||
}
|
||||
|
||||
@@ -369,11 +365,6 @@ where
|
||||
None => return Err(Error::ClientError),
|
||||
};
|
||||
|
||||
let application_name = match parameters.get("application_name") {
|
||||
Some(application_name) => application_name,
|
||||
None => "pgcat",
|
||||
};
|
||||
|
||||
let admin = ["pgcat", "pgbouncer"]
|
||||
.iter()
|
||||
.filter(|db| *db == &pool_name)
|
||||
@@ -502,7 +493,6 @@ where
|
||||
last_server_id: None,
|
||||
pool_name: pool_name.clone(),
|
||||
username: username.clone(),
|
||||
application_name: application_name.to_string(),
|
||||
shutdown,
|
||||
connected_to_server: false,
|
||||
});
|
||||
@@ -536,7 +526,6 @@ where
|
||||
last_server_id: None,
|
||||
pool_name: String::from("undefined"),
|
||||
username: String::from("undefined"),
|
||||
application_name: String::from("undefined"),
|
||||
shutdown,
|
||||
connected_to_server: false,
|
||||
});
|
||||
@@ -611,23 +600,10 @@ where
|
||||
message_result = read_message(&mut self.read) => message_result?
|
||||
};
|
||||
|
||||
match message[0] as char {
|
||||
// Buffer extended protocol messages even if we do not have
|
||||
// a server connection yet. Hopefully, when we get the S message
|
||||
// we'll be able to allocate a connection. Also, clients do not expect
|
||||
// the server to respond to these messages so even if we were not able to
|
||||
// allocate a connection, we wouldn't be able to send back an error message
|
||||
// to the client so we buffer them and defer the decision to error out or not
|
||||
// to when we get the S message
|
||||
'P' | 'B' | 'D' | 'E' => {
|
||||
self.buffer.put(&message[..]);
|
||||
continue;
|
||||
}
|
||||
'X' => {
|
||||
debug!("Client disconnecting");
|
||||
return Ok(());
|
||||
}
|
||||
_ => (),
|
||||
// Avoid taking a server if the client just wants to disconnect.
|
||||
if message[0] as char == 'X' {
|
||||
debug!("Client disconnecting");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Handle admin database queries.
|
||||
@@ -662,14 +638,14 @@ where
|
||||
// Normal query, not a custom command.
|
||||
None => {
|
||||
if query_router.query_parser_enabled() {
|
||||
query_router.infer_role_and_shard(message.clone());
|
||||
query_router.infer_role(message.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// SET SHARD TO
|
||||
Some((Command::SetShard, _)) => {
|
||||
// Selected shard is not configured.
|
||||
if query_router.shard() >= pool.shards() {
|
||||
let shard = query_router.shard();
|
||||
if shard >= pool.shards() {
|
||||
// Set the shard back to what it was.
|
||||
query_router.set_shard(current_shard);
|
||||
|
||||
@@ -677,7 +653,7 @@ where
|
||||
&mut self.write,
|
||||
&format!(
|
||||
"shard {} is more than configured {}, staying on shard {}",
|
||||
query_router.shard(),
|
||||
shard,
|
||||
pool.shards(),
|
||||
current_shard,
|
||||
),
|
||||
@@ -738,19 +714,22 @@ where
|
||||
conn
|
||||
}
|
||||
Err(err) => {
|
||||
// Client is attempting to get results from the server,
|
||||
// but we were unable to grab a connection from the pool
|
||||
// We'll send back an error message and clean the extended
|
||||
// protocol buffer
|
||||
if message[0] as char == 'S' {
|
||||
error!("Got Sync message but failed to get a connection from the pool");
|
||||
self.buffer.clear();
|
||||
}
|
||||
error_response(&mut self.write, "could not get connection from the pool")
|
||||
.await?;
|
||||
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
|
||||
// of extended protocol submission. So we will hold off on sending the actual error
|
||||
// message to the client until we get 'S' message
|
||||
match message[0] as char {
|
||||
'P' | 'B' | 'E' | 'D' => (),
|
||||
_ => {
|
||||
error_response(
|
||||
&mut self.write,
|
||||
"could not get connection from the pool",
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
};
|
||||
|
||||
error!("Could not get connection from pool: {:?}", err);
|
||||
|
||||
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
||||
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -780,10 +759,13 @@ where
|
||||
server.address()
|
||||
);
|
||||
|
||||
// Set application_name if any.
|
||||
// TODO: investigate other parameters and set them too.
|
||||
|
||||
// Set application_name.
|
||||
server.set_name(&self.application_name).await?;
|
||||
if self.parameters.contains_key("application_name") {
|
||||
server
|
||||
.set_name(&self.parameters["application_name"])
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Transaction loop. Multiple queries can be issued by the client here.
|
||||
// The connection belongs to the client until the transaction is over,
|
||||
@@ -800,7 +782,12 @@ where
|
||||
Err(err) => {
|
||||
// Client disconnected inside a transaction.
|
||||
// Clean up the server and re-use it.
|
||||
server.checkin_cleanup().await?;
|
||||
// This prevents connection thrashing by bad clients.
|
||||
if server.in_transaction() {
|
||||
server.query("ROLLBACK").await?;
|
||||
server.query("DISCARD ALL").await?;
|
||||
server.set_name("pgcat").await?;
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
@@ -842,7 +829,16 @@ where
|
||||
|
||||
// Terminate
|
||||
'X' => {
|
||||
server.checkin_cleanup().await?;
|
||||
// Client closing. Rollback and clean up
|
||||
// connection before releasing into the pool.
|
||||
// Pgbouncer closes the connection which leads to
|
||||
// connection thrashing when clients misbehave.
|
||||
if server.in_transaction() {
|
||||
server.query("ROLLBACK").await?;
|
||||
server.query("DISCARD ALL").await?;
|
||||
server.set_name("pgcat").await?;
|
||||
}
|
||||
|
||||
self.release();
|
||||
|
||||
return Ok(());
|
||||
@@ -879,23 +875,6 @@ where
|
||||
|
||||
self.buffer.put(&original[..]);
|
||||
|
||||
// Clone after freeze does not allocate
|
||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||
|
||||
// Almost certainly true
|
||||
if first_message_code == 'P' {
|
||||
// Message layout
|
||||
// P followed by 32 int followed by null-terminated statement name
|
||||
// So message code should be in offset 0 of the buffer, first character
|
||||
// in prepared statement name would be index 5
|
||||
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
|
||||
if first_char_in_name != 0 {
|
||||
// This is a named prepared statement
|
||||
// Server connection state will need to be cleared at checkin
|
||||
server.mark_dirty();
|
||||
}
|
||||
}
|
||||
|
||||
self.send_and_receive_loop(
|
||||
code,
|
||||
self.buffer.clone(),
|
||||
@@ -963,10 +942,8 @@ where
|
||||
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
server.checkin_cleanup().await?;
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
self.connected_to_server = false;
|
||||
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id, address.id);
|
||||
}
|
||||
|
||||
@@ -72,6 +72,9 @@ pub struct Address {
|
||||
/// The name of the Postgres database.
|
||||
pub database: String,
|
||||
|
||||
/// Default search_path.
|
||||
pub search_path: Option<String>,
|
||||
|
||||
/// Server role: replica, primary.
|
||||
pub role: Role,
|
||||
|
||||
@@ -98,6 +101,7 @@ impl Default for Address {
|
||||
address_index: 0,
|
||||
replica_number: 0,
|
||||
database: String::from("database"),
|
||||
search_path: None,
|
||||
role: Role::Replica,
|
||||
username: String::from("username"),
|
||||
pool_name: String::from("pool_name"),
|
||||
@@ -185,7 +189,6 @@ pub struct Pool {
|
||||
pub query_parser_enabled: bool,
|
||||
pub primary_reads_enabled: bool,
|
||||
pub sharding_function: String,
|
||||
pub sharding_key: Option<String>,
|
||||
pub shards: HashMap<String, Shard>,
|
||||
pub users: HashMap<String, User>,
|
||||
}
|
||||
@@ -199,7 +202,6 @@ impl Default for Pool {
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: "pg_bigint_hash".to_string(),
|
||||
sharding_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,6 +210,7 @@ impl Default for Pool {
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct Shard {
|
||||
pub database: String,
|
||||
pub search_path: Option<String>,
|
||||
pub servers: Vec<(String, u16, String)>,
|
||||
}
|
||||
|
||||
@@ -215,6 +218,7 @@ impl Default for Shard {
|
||||
fn default() -> Shard {
|
||||
Shard {
|
||||
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
|
||||
search_path: None,
|
||||
database: String::from("postgres"),
|
||||
}
|
||||
}
|
||||
|
||||
82
src/main.rs
82
src/main.rs
@@ -74,8 +74,7 @@ use crate::stats::{Collector, Reporter, REPORTER};
|
||||
|
||||
#[tokio::main(worker_threads = 4)]
|
||||
async fn main() {
|
||||
env_logger::builder().format_timestamp_micros().init();
|
||||
|
||||
env_logger::init();
|
||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||
|
||||
if !query_router::QueryRouter::setup() {
|
||||
@@ -155,31 +154,12 @@ async fn main() {
|
||||
|
||||
info!("Config autoreloader: {}", config.general.autoreload);
|
||||
|
||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||
let autoreload_client_server_map = client_server_map.clone();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
autoreload_interval.tick().await;
|
||||
if config.general.autoreload {
|
||||
info!("Automatically reloading config");
|
||||
|
||||
match reload_config(autoreload_client_server_map.clone()).await {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
get_config().show()
|
||||
}
|
||||
}
|
||||
Err(_) => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
||||
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
||||
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
||||
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
|
||||
let (drain_tx, mut drain_rx) = mpsc::channel::<i8>(2048);
|
||||
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
||||
|
||||
info!("Waiting for clients");
|
||||
@@ -202,6 +182,21 @@ async fn main() {
|
||||
get_config().show();
|
||||
},
|
||||
|
||||
_ = autoreload_interval.tick() => {
|
||||
if config.general.autoreload {
|
||||
info!("Automatically reloading config");
|
||||
|
||||
match reload_config(client_server_map.clone()).await {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
get_config().show()
|
||||
}
|
||||
}
|
||||
Err(_) => (),
|
||||
};
|
||||
}
|
||||
},
|
||||
|
||||
// Initiate graceful shutdown sequence on sig int
|
||||
_ = interrupt_signal.recv() => {
|
||||
info!("Got SIGINT, waiting for client connection drain now");
|
||||
@@ -222,16 +217,13 @@ async fn main() {
|
||||
interval.tick().await;
|
||||
|
||||
// We're done waiting.
|
||||
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
|
||||
error!("Timed out waiting for clients");
|
||||
|
||||
let _ = exit_tx.send(()).await;
|
||||
});
|
||||
},
|
||||
|
||||
_ = term_signal.recv() => {
|
||||
info!("Got SIGTERM, closing with {} clients active", total_clients);
|
||||
break;
|
||||
},
|
||||
_ = term_signal.recv() => break,
|
||||
|
||||
new_client = listener.accept() => {
|
||||
let (socket, addr) = match new_client {
|
||||
@@ -308,18 +300,34 @@ async fn main() {
|
||||
///
|
||||
/// * `duration` - A duration of time
|
||||
fn format_duration(duration: &chrono::Duration) -> String {
|
||||
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
||||
let seconds = {
|
||||
let seconds = duration.num_seconds() % 60;
|
||||
if seconds < 10 {
|
||||
format!("0{}", seconds)
|
||||
} else {
|
||||
format!("{}", seconds)
|
||||
}
|
||||
};
|
||||
|
||||
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
||||
let minutes = {
|
||||
let minutes = duration.num_minutes() % 60;
|
||||
if minutes < 10 {
|
||||
format!("0{}", minutes)
|
||||
} else {
|
||||
format!("{}", minutes)
|
||||
}
|
||||
};
|
||||
|
||||
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
||||
|
||||
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
||||
let hours = {
|
||||
let hours = duration.num_hours() % 24;
|
||||
if hours < 10 {
|
||||
format!("0{}", hours)
|
||||
} else {
|
||||
format!("{}", hours)
|
||||
}
|
||||
};
|
||||
|
||||
let days = duration.num_days().to_string();
|
||||
|
||||
format!(
|
||||
"{}d {}:{}:{}.{}",
|
||||
days, hours, minutes, seconds, milliseconds
|
||||
)
|
||||
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
|
||||
}
|
||||
|
||||
@@ -111,7 +111,12 @@ where
|
||||
|
||||
/// Send the startup packet the server. We're pretending we're a Pg client.
|
||||
/// This tells the server which user we are and what database we want.
|
||||
pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Result<(), Error> {
|
||||
pub async fn startup(
|
||||
stream: &mut TcpStream,
|
||||
user: &str,
|
||||
database: &str,
|
||||
search_path: Option<&String>,
|
||||
) -> Result<(), Error> {
|
||||
let mut bytes = BytesMut::with_capacity(25);
|
||||
|
||||
bytes.put_i32(196608); // Protocol number
|
||||
@@ -125,6 +130,17 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
|
||||
bytes.put(&b"database\0"[..]);
|
||||
bytes.put_slice(&database.as_bytes());
|
||||
bytes.put_u8(0);
|
||||
|
||||
// search_path
|
||||
match search_path {
|
||||
Some(search_path) => {
|
||||
bytes.put(&b"options\0"[..]);
|
||||
bytes.put_slice(&format!("-c search_path={}", search_path).as_bytes());
|
||||
bytes.put_u8(0);
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
|
||||
bytes.put_u8(0); // Null terminator
|
||||
|
||||
let len = bytes.len() as i32 + 4i32;
|
||||
|
||||
26
src/pool.rs
26
src/pool.rs
@@ -8,7 +8,6 @@ use once_cell::sync::Lazy;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use regex::Regex;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
@@ -69,9 +68,6 @@ pub struct PoolSettings {
|
||||
|
||||
// Sharding function.
|
||||
pub sharding_function: ShardingFunction,
|
||||
|
||||
// Automatically detect sharding key in query.
|
||||
pub sharding_key_regex: Option<Regex>,
|
||||
}
|
||||
|
||||
impl Default for PoolSettings {
|
||||
@@ -84,7 +80,6 @@ impl Default for PoolSettings {
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
sharding_key_regex: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,6 +155,7 @@ impl ConnectionPool {
|
||||
let address = Address {
|
||||
id: address_id,
|
||||
database: shard.database.clone(),
|
||||
search_path: shard.search_path.clone(),
|
||||
host: server.0.clone(),
|
||||
port: server.1 as u16,
|
||||
role: role,
|
||||
@@ -234,20 +230,6 @@ impl ConnectionPool {
|
||||
"sha1" => ShardingFunction::Sha1,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
sharding_key_regex: match &pool_config.sharding_key {
|
||||
Some(sharding_key) => match Regex::new(&format!(
|
||||
r"(?i) *{} *= *'?([0-9]+)'?",
|
||||
sharding_key
|
||||
)) {
|
||||
Ok(regex) => Some(regex),
|
||||
Err(err) => {
|
||||
error!("Sharding key regex error: {:?}", err);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
},
|
||||
|
||||
None => None,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -578,7 +560,11 @@ impl ManageConnection for ServerPool {
|
||||
|
||||
/// Attempts to create a new connection.
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
info!("Creating a new server connection {:?}", self.address);
|
||||
info!(
|
||||
"Creating a new connection to {:?} using user {:?}",
|
||||
self.address.name(),
|
||||
self.user.username
|
||||
);
|
||||
|
||||
// Put a temporary process_id into the stats
|
||||
// for server login.
|
||||
|
||||
@@ -55,8 +55,6 @@ pub struct QueryRouter {
|
||||
/// Include the primary into the replica pool for reads.
|
||||
primary_reads_enabled: bool,
|
||||
|
||||
set_manually: bool,
|
||||
|
||||
/// Pool configuration.
|
||||
pool_settings: PoolSettings,
|
||||
}
|
||||
@@ -99,7 +97,6 @@ impl QueryRouter {
|
||||
active_role: None,
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: false,
|
||||
set_manually: false,
|
||||
pool_settings: PoolSettings::default(),
|
||||
}
|
||||
}
|
||||
@@ -107,11 +104,6 @@ impl QueryRouter {
|
||||
/// Pool settings can change because of a config reload.
|
||||
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
|
||||
self.pool_settings = pool_settings;
|
||||
|
||||
if !self.set_manually {
|
||||
self.query_parser_enabled = self.pool_settings.query_parser_enabled;
|
||||
self.primary_reads_enabled = self.pool_settings.primary_reads_enabled;
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to parse a command and execute it.
|
||||
@@ -213,8 +205,6 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
Command::SetServerRole => {
|
||||
self.set_manually = true;
|
||||
|
||||
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
||||
"primary" => {
|
||||
self.query_parser_enabled = false;
|
||||
@@ -238,7 +228,7 @@ impl QueryRouter {
|
||||
|
||||
"default" => {
|
||||
self.active_role = self.pool_settings.default_role;
|
||||
self.query_parser_enabled = self.pool_settings.query_parser_enabled;
|
||||
self.query_parser_enabled = self.query_parser_enabled;
|
||||
self.active_role
|
||||
}
|
||||
|
||||
@@ -247,8 +237,6 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
Command::SetPrimaryReads => {
|
||||
self.set_manually = true;
|
||||
|
||||
if value == "on" {
|
||||
debug!("Setting primary reads to on");
|
||||
self.primary_reads_enabled = true;
|
||||
@@ -268,7 +256,7 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
/// Try to infer which server to connect to based on the contents of the query.
|
||||
pub fn infer_role_and_shard(&mut self, mut buf: BytesMut) -> bool {
|
||||
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool {
|
||||
debug!("Inferring role");
|
||||
|
||||
let code = buf.get_u8() as char;
|
||||
@@ -309,31 +297,6 @@ impl QueryRouter {
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
// First find the shard key
|
||||
match &self.pool_settings.sharding_key_regex {
|
||||
Some(re) => {
|
||||
match re.captures(&query) {
|
||||
Some(group) => match group.get(1) {
|
||||
Some(value) => {
|
||||
let value = value.as_str().parse::<i64>().unwrap();
|
||||
let sharder = Sharder::new(
|
||||
self.pool_settings.shards,
|
||||
self.pool_settings.sharding_function,
|
||||
);
|
||||
let shard = sharder.shard(value);
|
||||
self.active_shard = Some(shard);
|
||||
|
||||
debug!("Automatically routing to shard {}", shard);
|
||||
}
|
||||
None => (),
|
||||
},
|
||||
|
||||
None => (),
|
||||
};
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
|
||||
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
|
||||
Ok(ast) => ast,
|
||||
Err(err) => {
|
||||
@@ -410,7 +373,7 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_and_shard_replica() {
|
||||
fn test_infer_role_replica() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
|
||||
@@ -419,25 +382,22 @@ mod test {
|
||||
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO off")) != None);
|
||||
|
||||
let queries = vec![
|
||||
simple_query("SELECT * FROM items WHERE id = 4"),
|
||||
simple_query("SELECT * FROM items WHERE id = 5"),
|
||||
simple_query(
|
||||
"SELECT id, name, value FROM items INNER JOIN prices ON item.id = prices.item_id",
|
||||
),
|
||||
simple_query("WITH t AS (SELECT * FROM items) SELECT * FROM t"),
|
||||
];
|
||||
|
||||
let shards = vec![0, 0, 0];
|
||||
|
||||
for (idx, query) in queries.iter().enumerate() {
|
||||
for query in queries {
|
||||
// It's a recognized query
|
||||
assert!(qr.infer_role_and_shard(query.clone()));
|
||||
assert!(qr.infer_role(query));
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
assert_eq!(qr.shard(), shards[idx]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_and_shard_primary() {
|
||||
fn test_infer_role_primary() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
|
||||
@@ -450,24 +410,24 @@ mod test {
|
||||
|
||||
for query in queries {
|
||||
// It's a recognized query
|
||||
assert!(qr.infer_role_and_shard(query));
|
||||
assert!(qr.infer_role(query));
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_and_shard_primary_reads_enabled() {
|
||||
fn test_infer_role_primary_reads_enabled() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
let query = simple_query("SELECT * FROM items WHERE id = 5");
|
||||
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);
|
||||
|
||||
assert!(qr.infer_role_and_shard(query));
|
||||
assert!(qr.infer_role(query));
|
||||
assert_eq!(qr.role(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_infer_role_and_shard_parse_prepared() {
|
||||
fn test_infer_role_parse_prepared() {
|
||||
QueryRouter::setup();
|
||||
let mut qr = QueryRouter::new();
|
||||
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
|
||||
@@ -482,7 +442,7 @@ mod test {
|
||||
res.put(prepared_stmt);
|
||||
res.put_i16(0);
|
||||
|
||||
assert!(qr.infer_role_and_shard(res));
|
||||
assert!(qr.infer_role(res));
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
}
|
||||
|
||||
@@ -646,17 +606,17 @@ mod test {
|
||||
assert_eq!(qr.role(), None);
|
||||
|
||||
let query = simple_query("INSERT INTO test_table VALUES (1)");
|
||||
assert_eq!(qr.infer_role_and_shard(query), true);
|
||||
assert_eq!(qr.infer_role(query), true);
|
||||
assert_eq!(qr.role(), Some(Role::Primary));
|
||||
|
||||
let query = simple_query("SELECT * FROM test_table");
|
||||
assert_eq!(qr.infer_role_and_shard(query), true);
|
||||
assert_eq!(qr.infer_role(query), true);
|
||||
assert_eq!(qr.role(), Some(Role::Replica));
|
||||
|
||||
assert!(qr.query_parser_enabled());
|
||||
let query = simple_query("SET SERVER ROLE TO 'default'");
|
||||
assert!(qr.try_execute_command(query) != None);
|
||||
assert!(!qr.query_parser_enabled());
|
||||
assert!(qr.query_parser_enabled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -669,8 +629,7 @@ mod test {
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
query_parser_enabled: true,
|
||||
primary_reads_enabled: true,
|
||||
sharding_key_regex: None,
|
||||
primary_reads_enabled: false,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
};
|
||||
let mut qr = QueryRouter::new();
|
||||
@@ -684,8 +643,8 @@ mod test {
|
||||
|
||||
assert_eq!(qr.active_role, None);
|
||||
assert_eq!(qr.active_shard, None);
|
||||
assert_eq!(qr.query_parser_enabled, true);
|
||||
assert_eq!(qr.primary_reads_enabled, true);
|
||||
assert_eq!(qr.query_parser_enabled, false);
|
||||
assert_eq!(qr.primary_reads_enabled, false);
|
||||
|
||||
let q1 = simple_query("SET SERVER ROLE TO 'primary'");
|
||||
assert!(qr.try_execute_command(q1) != None);
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
/// Implementation of the PostgreSQL server (database) protocol.
|
||||
/// Here we are pretending to the a Postgres client.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::io::Read;
|
||||
use log::{debug, error, info, trace};
|
||||
use std::time::SystemTime;
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::net::{
|
||||
@@ -49,9 +48,6 @@ pub struct Server {
|
||||
/// Is the server broken? We'll remote it from the pool if so.
|
||||
bad: bool,
|
||||
|
||||
/// If server connection requires a DISCARD ALL before checkin
|
||||
needs_cleanup: bool,
|
||||
|
||||
/// Mapping of clients and servers used for query cancellation.
|
||||
client_server_map: ClientServerMap,
|
||||
|
||||
@@ -90,7 +86,13 @@ impl Server {
|
||||
trace!("Sending StartupMessage");
|
||||
|
||||
// StartupMessage
|
||||
startup(&mut stream, &user.username, database).await?;
|
||||
startup(
|
||||
&mut stream,
|
||||
&user.username,
|
||||
database,
|
||||
address.search_path.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut server_info = BytesMut::new();
|
||||
let mut process_id: i32 = 0;
|
||||
@@ -320,7 +322,6 @@ impl Server {
|
||||
in_transaction: false,
|
||||
data_available: false,
|
||||
bad: false,
|
||||
needs_cleanup: false,
|
||||
client_server_map: client_server_map,
|
||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||
stats: stats,
|
||||
@@ -445,29 +446,6 @@ impl Server {
|
||||
break;
|
||||
}
|
||||
|
||||
// CommandComplete
|
||||
'C' => {
|
||||
let mut command_tag = String::new();
|
||||
match message.reader().read_to_string(&mut command_tag) {
|
||||
Ok(_) => {
|
||||
// Non-exhaustive list of commands that are likely to change session variables/resources
|
||||
// which can leak between clients. This is a best effort to block bad clients
|
||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||
match command_tag.as_str() {
|
||||
"SET\0" | "PREPARE\0" => {
|
||||
debug!("Server connection marked for clean up");
|
||||
self.needs_cleanup = true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
warn!("Encountered an error while parsing CommandTag {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DataRow
|
||||
'D' => {
|
||||
// More data is available after this message, this is not the end of the reply.
|
||||
@@ -581,43 +559,14 @@ impl Server {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform any necessary cleanup before putting the server
|
||||
/// connection back in the pool
|
||||
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
|
||||
// Client disconnected with an open transaction on the server connection.
|
||||
// Pgbouncer behavior is to close the server connection but that can cause
|
||||
// server connection thrashing if clients repeatedly do this.
|
||||
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
||||
if self.in_transaction() {
|
||||
self.query("ROLLBACK").await?;
|
||||
}
|
||||
|
||||
// Client disconnected but it perfromed session-altering operations such as
|
||||
// SET statement_timeout to 1 or create a prepared statement. We clear that
|
||||
// to avoid leaking state between clients. For performance reasons we only
|
||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||
// it before each checkin.
|
||||
if self.needs_cleanup {
|
||||
self.query("DISCARD ALL").await?;
|
||||
self.needs_cleanup = false;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/// A shorthand for `SET application_name = $1`.
|
||||
#[allow(dead_code)]
|
||||
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
||||
if self.application_name != name {
|
||||
self.application_name = name.to_string();
|
||||
// We don't want `SET application_name` to mark the server connection
|
||||
// as needing cleanup
|
||||
let needs_cleanup_before = self.needs_cleanup;
|
||||
|
||||
let result = Ok(self
|
||||
Ok(self
|
||||
.query(&format!("SET application_name = '{}'", name))
|
||||
.await?);
|
||||
self.needs_cleanup = needs_cleanup_before;
|
||||
return result;
|
||||
.await?)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@@ -638,11 +587,6 @@ impl Server {
|
||||
pub fn last_activity(&self) -> SystemTime {
|
||||
self.last_activity
|
||||
}
|
||||
|
||||
// Marks a connection as needing DISCARD ALL at checkin
|
||||
pub fn mark_dirty(&mut self) {
|
||||
self.needs_cleanup = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
@@ -669,8 +613,7 @@ impl Drop for Server {
|
||||
let duration = now - self.connected_at;
|
||||
|
||||
info!(
|
||||
"Server connection closed {:?}, session duration: {}",
|
||||
self.address,
|
||||
"Server connection closed, session duration: {}",
|
||||
crate::format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
FROM rust:bullseye
|
||||
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
|
||||
RUN cargo install cargo-binutils rustfilt
|
||||
RUN rustup component add llvm-tools-preview
|
||||
@@ -1,47 +0,0 @@
|
||||
version: "3"
|
||||
services:
|
||||
pg1:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
|
||||
pg2:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
|
||||
pg3:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
|
||||
pg4:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
|
||||
main:
|
||||
build: .
|
||||
command: ["bash", "/app/tests/docker/run.sh"]
|
||||
environment:
|
||||
RUSTFLAGS: "-C instrument-coverage"
|
||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||
volumes:
|
||||
- ../../:/app/
|
||||
- /app/target/
|
||||
@@ -1,21 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
rm /app/*.profraw || true
|
||||
rm /app/pgcat.profdata || true
|
||||
rm -rf /app/cov || true
|
||||
|
||||
cd /app/
|
||||
|
||||
cargo build
|
||||
cargo test --tests
|
||||
|
||||
bash .circleci/run_tests.sh
|
||||
|
||||
rust-profdata merge -sparse pgcat-*.profraw -o pgcat.profdata
|
||||
|
||||
rust-cov export -ignore-filename-regex="rustc|registry" -Xdemangler=rustfilt -instr-profile=pgcat.profdata --object ./target/debug/pgcat --format lcov > ./lcov.info
|
||||
|
||||
genhtml lcov.info --output-directory cov --prefix $(pwd)
|
||||
|
||||
rm /app/*.profraw
|
||||
rm /app/pgcat.profdata
|
||||
@@ -18,14 +18,9 @@ def pgcat_start():
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
source "https://rubygems.org"
|
||||
|
||||
gem "pg"
|
||||
gem "toml"
|
||||
gem "rspec"
|
||||
gem "rubocop"
|
||||
gem "toxiproxy"
|
||||
gem "activerecord"
|
||||
gem "rubocop"
|
||||
gem "toml", "~> 0.3.0"
|
||||
|
||||
@@ -13,7 +13,6 @@ GEM
|
||||
tzinfo (~> 2.0)
|
||||
ast (2.4.2)
|
||||
concurrent-ruby (1.1.10)
|
||||
diff-lcs (1.5.0)
|
||||
i18n (1.11.0)
|
||||
concurrent-ruby (~> 1.0)
|
||||
minitest (5.16.2)
|
||||
@@ -25,19 +24,6 @@ GEM
|
||||
rainbow (3.1.1)
|
||||
regexp_parser (2.3.1)
|
||||
rexml (3.2.5)
|
||||
rspec (3.11.0)
|
||||
rspec-core (~> 3.11.0)
|
||||
rspec-expectations (~> 3.11.0)
|
||||
rspec-mocks (~> 3.11.0)
|
||||
rspec-core (3.11.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-expectations (3.11.0)
|
||||
diff-lcs (>= 1.2.0, < 2.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-mocks (3.11.1)
|
||||
diff-lcs (>= 1.2.0, < 2.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-support (3.11.0)
|
||||
rubocop (1.29.0)
|
||||
parallel (~> 1.10)
|
||||
parser (>= 3.1.0.0)
|
||||
@@ -52,23 +38,19 @@ GEM
|
||||
ruby-progressbar (1.11.0)
|
||||
toml (0.3.0)
|
||||
parslet (>= 1.8.0, < 3.0.0)
|
||||
toxiproxy (2.0.1)
|
||||
tzinfo (2.0.4)
|
||||
concurrent-ruby (~> 1.0)
|
||||
unicode-display_width (2.1.0)
|
||||
|
||||
PLATFORMS
|
||||
aarch64-linux
|
||||
arm64-darwin-21
|
||||
x86_64-linux
|
||||
|
||||
DEPENDENCIES
|
||||
activerecord
|
||||
pg
|
||||
rspec
|
||||
rubocop
|
||||
toml
|
||||
toxiproxy
|
||||
toml (~> 0.3.0)
|
||||
|
||||
BUNDLED WITH
|
||||
2.3.21
|
||||
2.3.7
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
require 'pg'
|
||||
require 'toxiproxy'
|
||||
|
||||
class PgInstance
|
||||
attr_reader :port
|
||||
attr_reader :username
|
||||
attr_reader :password
|
||||
attr_reader :database_name
|
||||
|
||||
def initialize(port, username, password, database_name)
|
||||
@original_port = port
|
||||
@toxiproxy_port = 10000 + port.to_i
|
||||
@port = @toxiproxy_port
|
||||
|
||||
@username = username
|
||||
@password = password
|
||||
@database_name = database_name
|
||||
@toxiproxy_name = "database_#{@original_port}"
|
||||
Toxiproxy.populate([{
|
||||
name: @toxiproxy_name,
|
||||
listen: "0.0.0.0:#{@toxiproxy_port}",
|
||||
upstream: "localhost:#{@original_port}",
|
||||
}])
|
||||
|
||||
# Toxiproxy server will outlive our PgInstance objects
|
||||
# so we want to destroy our proxies before exiting
|
||||
# Ruby finalizer is ideal for doing this
|
||||
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
|
||||
end
|
||||
|
||||
def with_connection
|
||||
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
|
||||
yield conn
|
||||
ensure
|
||||
conn&.close
|
||||
end
|
||||
|
||||
def reset
|
||||
reset_toxics
|
||||
reset_stats
|
||||
end
|
||||
|
||||
def toxiproxy
|
||||
Toxiproxy[@toxiproxy_name]
|
||||
end
|
||||
|
||||
def take_down
|
||||
if block_given?
|
||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
|
||||
else
|
||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
|
||||
end
|
||||
end
|
||||
|
||||
def add_latency(latency)
|
||||
if block_given?
|
||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
|
||||
else
|
||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
|
||||
end
|
||||
end
|
||||
|
||||
def delete_proxy
|
||||
Toxiproxy[@toxiproxy_name].delete
|
||||
end
|
||||
|
||||
def reset_toxics
|
||||
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
|
||||
end
|
||||
|
||||
def reset_stats
|
||||
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
|
||||
end
|
||||
|
||||
def count_query(query)
|
||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
|
||||
end
|
||||
|
||||
def count_select_1_plus_2
|
||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
|
||||
end
|
||||
end
|
||||
@@ -1,100 +0,0 @@
|
||||
require 'json'
|
||||
require 'ostruct'
|
||||
require_relative 'pgcat_process'
|
||||
require_relative 'pg_instance'
|
||||
|
||||
module Helpers
|
||||
module Pgcat
|
||||
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("info")
|
||||
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
|
||||
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
||||
|
||||
pgcat_cfg = pgcat.current_config
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "any",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
|
||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.shards = [primary0, primary1, primary2]
|
||||
struct.all_databases = [primary0, primary1, primary2]
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("info")
|
||||
pgcat_cfg = pgcat.current_config
|
||||
|
||||
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
|
||||
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
||||
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
||||
|
||||
# Main proxy configs
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "any",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => {
|
||||
"database" => "shard0",
|
||||
"servers" => [
|
||||
["localhost", primary.port.to_s, "primary"],
|
||||
["localhost", replica0.port.to_s, "replica"],
|
||||
["localhost", replica1.port.to_s, "replica"],
|
||||
["localhost", replica2.port.to_s, "replica"]
|
||||
]
|
||||
},
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat_cfg["general"]["port"] = pgcat.port
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.primary = primary
|
||||
struct.replicas = [replica0, replica1, replica2]
|
||||
struct.all_databases = [primary, replica0, replica1, replica2]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,116 +0,0 @@
|
||||
require 'pg'
|
||||
require 'toml'
|
||||
require 'fileutils'
|
||||
require 'securerandom'
|
||||
|
||||
class PgcatProcess
|
||||
attr_reader :port
|
||||
attr_reader :pid
|
||||
|
||||
def self.finalize(pid, log_filename, config_filename)
|
||||
`kill #{pid}`
|
||||
File.delete(config_filename) if File.exist?(config_filename)
|
||||
File.delete(log_filename) if File.exist?(log_filename)
|
||||
end
|
||||
|
||||
def initialize(log_level)
|
||||
@env = {"RUST_LOG" => log_level}
|
||||
@port = rand(20000..32760)
|
||||
@log_level = log_level
|
||||
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
|
||||
@config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml"
|
||||
|
||||
@command = "../../target/debug/pgcat #{@config_filename}"
|
||||
|
||||
FileUtils.cp("../../pgcat.toml", @config_filename)
|
||||
cfg = current_config
|
||||
cfg["general"]["port"] = @port.to_i
|
||||
cfg["general"]["enable_prometheus_exporter"] = false
|
||||
|
||||
update_config(cfg)
|
||||
end
|
||||
|
||||
def logs
|
||||
File.read(@log_filename)
|
||||
end
|
||||
|
||||
def update_config(config_hash)
|
||||
@original_config = current_config
|
||||
output_to_write = TOML::Generator.new(config_hash).body
|
||||
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
|
||||
File.write(@config_filename, output_to_write)
|
||||
end
|
||||
|
||||
def current_config
|
||||
old_cfg = File.read(@config_filename)
|
||||
loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",')
|
||||
TOML.load(loadable_string)
|
||||
end
|
||||
|
||||
def reload_config
|
||||
`kill -s HUP #{@pid}`
|
||||
sleep 0.1
|
||||
end
|
||||
|
||||
def start
|
||||
raise StandardError, "Process is already started" unless @pid.nil?
|
||||
@pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename)
|
||||
ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) })
|
||||
|
||||
return self
|
||||
end
|
||||
|
||||
def wait_until_ready
|
||||
exc = nil
|
||||
10.times do
|
||||
PG::connect(example_connection_string).close
|
||||
|
||||
return self
|
||||
rescue => e
|
||||
exc = e
|
||||
sleep(0.5)
|
||||
end
|
||||
puts exc
|
||||
raise StandardError, "Process #{@pid} never became ready. Logs #{logs}"
|
||||
end
|
||||
|
||||
def stop
|
||||
`kill #{@pid}`
|
||||
sleep 0.1
|
||||
end
|
||||
|
||||
def shutdown
|
||||
stop
|
||||
File.delete(@config_filename) if File.exist?(@config_filename)
|
||||
File.delete(@log_filename) if File.exist?(@log_filename)
|
||||
end
|
||||
|
||||
def admin_connection_string
|
||||
cfg = current_config
|
||||
username = cfg["general"]["admin_username"]
|
||||
password = cfg["general"]["admin_password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
|
||||
end
|
||||
|
||||
def connection_string(pool_name, username)
|
||||
cfg = current_config
|
||||
|
||||
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
|
||||
password = user_obj["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
|
||||
end
|
||||
|
||||
def example_connection_string
|
||||
cfg = current_config
|
||||
first_pool_name = cfg["pools"].keys[0]
|
||||
|
||||
db_name = first_pool_name
|
||||
|
||||
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
||||
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
||||
end
|
||||
end
|
||||
@@ -1,61 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Load Balancing" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context "under regular circumstances" do
|
||||
it "balances query volume between all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
query_count = QUERY_COUNT
|
||||
expected_share = query_count / processes.all_databases.count
|
||||
failed_count = 0
|
||||
|
||||
query_count.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when some replicas are down" do
|
||||
it "balances query volume between working instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||
failed_count = 0
|
||||
|
||||
processes[:replicas][0].take_down do
|
||||
processes[:replicas][1].take_down do
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(2)
|
||||
processes.all_databases.each do |instance|
|
||||
queries_routed = instance.count_select_1_plus_2
|
||||
if processes.replicas[0..1].include?(instance)
|
||||
expect(queries_routed).to eq(0)
|
||||
else
|
||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,193 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Miscellaneous" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "Extended Protocol handling" do
|
||||
it "does not send packets that client does not expect during extended protocol sequence" do
|
||||
new_configs = processes.pgcat.current_config
|
||||
|
||||
new_configs["general"]["connect_timeout"] = 500
|
||||
new_configs["general"]["ban_time"] = 1
|
||||
new_configs["general"]["shutdown_timeout"] = 1
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
|
||||
25.times do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
||||
ensure
|
||||
conn&.close
|
||||
end
|
||||
end
|
||||
|
||||
sleep(0.5)
|
||||
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
stdout, stderr = with_captured_stdout_stderr do
|
||||
15.times do |i|
|
||||
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
||||
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
|
||||
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
||||
end
|
||||
end
|
||||
|
||||
describe "Pool recycling after config reload" do
|
||||
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
||||
|
||||
it "should update pools for new clients and clients that are no longer in transaction" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
server_conn.async_exec("BEGIN")
|
||||
|
||||
# No config change yet, client should set old configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard0')
|
||||
|
||||
# Swap shards
|
||||
new_config = processes.pgcat.current_config
|
||||
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
|
||||
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
|
||||
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
|
||||
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
|
||||
|
||||
# Reload config
|
||||
processes.pgcat.update_config(new_config)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
|
||||
# Config changed but transaction is in progress, client should set old configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard0')
|
||||
server_conn.async_exec("COMMIT")
|
||||
|
||||
# Transaction finished, client should get new configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard1')
|
||||
|
||||
# New connection should get new configs
|
||||
server_conn.close()
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard1')
|
||||
end
|
||||
end
|
||||
|
||||
describe "Clients closing connection in the middle of transaction" do
|
||||
it "sends a rollback to the server" do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("BEGIN")
|
||||
conn.close
|
||||
|
||||
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe "Server version reporting" do
|
||||
it "reports correct version for normal and admin databases" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expect(server_conn.server_version).not_to eq(0)
|
||||
server_conn.close
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
expect(admin_conn.server_version).not_to eq(0)
|
||||
admin_conn.close
|
||||
end
|
||||
end
|
||||
|
||||
describe "State clearance" do
|
||||
context "session mode" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
|
||||
|
||||
it "Clears state before connection checkin" do
|
||||
# Both modes of operation should not raise
|
||||
# ERROR: prepared statement "prepared_q" already exists
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||
conn.async_exec("SET statement_timeout to 1000")
|
||||
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
|
||||
conn.close
|
||||
end
|
||||
|
||||
it "Does not send DISCARD ALL unless necessary" do
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.async_exec("SET statement_timeout to 5000")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||
end
|
||||
end
|
||||
|
||||
context "transaction mode" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
||||
it "Clears state before connection checkin" do
|
||||
# Both modes of operation should not raise
|
||||
# ERROR: prepared statement "prepared_q" already exists
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.prepare("prepared_q", "SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
end
|
||||
|
||||
it "Does not send DISCARD ALL unless necessary" do
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.exec_params("SELECT $1", [1])
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.async_exec("SET statement_timeout to 5000")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,81 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
|
||||
describe "Routing" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "SET ROLE" do
|
||||
context "primary" do
|
||||
it "routes queries only to primary" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
|
||||
query_count = 30
|
||||
failed_count = 0
|
||||
|
||||
query_count.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to eq(0)
|
||||
end
|
||||
|
||||
expect(processes.primary.count_select_1_plus_2).to eq(query_count)
|
||||
end
|
||||
end
|
||||
context "replica" do
|
||||
it "routes queries only to replicas" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'replica'")
|
||||
|
||||
expected_share = QUERY_COUNT / processes.replicas.count
|
||||
failed_count = 0
|
||||
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
|
||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
|
||||
expect(processes.primary.count_select_1_plus_2).to eq(0)
|
||||
end
|
||||
end
|
||||
|
||||
context "any" do
|
||||
it "routes queries to all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'any'")
|
||||
|
||||
expected_share = QUERY_COUNT / processes.all_databases.count
|
||||
failed_count = 0
|
||||
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,21 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'pg'
|
||||
require_relative 'helpers/pgcat_helper'
|
||||
|
||||
QUERY_COUNT = 300
|
||||
MARGIN_OF_ERROR = 0.30
|
||||
|
||||
def with_captured_stdout_stderr
|
||||
sout = STDOUT.clone
|
||||
serr = STDERR.clone
|
||||
STDOUT.reopen("/tmp/out.txt", "w+")
|
||||
STDERR.reopen("/tmp/err.txt", "w+")
|
||||
STDOUT.sync = true
|
||||
STDERR.sync = true
|
||||
yield
|
||||
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
||||
ensure
|
||||
STDOUT.reopen(sout)
|
||||
STDERR.reopen(serr)
|
||||
end
|
||||
@@ -1,6 +1,93 @@
|
||||
# frozen_string_literal: true
|
||||
require 'pg'
|
||||
|
||||
require 'active_record'
|
||||
require 'pg'
|
||||
require 'toml'
|
||||
|
||||
$stdout.sync = true
|
||||
$stderr.sync = true
|
||||
|
||||
class ConfigEditor
|
||||
def initialize
|
||||
@original_config_text = File.read('../../.circleci/pgcat.toml')
|
||||
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
|
||||
|
||||
@original_configs = TOML.load(text_to_load)
|
||||
end
|
||||
|
||||
def original_configs
|
||||
TOML.load(TOML::Generator.new(@original_configs).body)
|
||||
end
|
||||
|
||||
def with_modified_configs(new_configs)
|
||||
text_to_write = TOML::Generator.new(new_configs).body
|
||||
text_to_write = text_to_write.gsub("\"5432\"", "5432")
|
||||
File.write('../../.circleci/pgcat.toml', text_to_write)
|
||||
yield
|
||||
ensure
|
||||
File.write('../../.circleci/pgcat.toml', @original_config_text)
|
||||
end
|
||||
end
|
||||
|
||||
def with_captured_stdout_stderr
|
||||
sout = STDOUT.clone
|
||||
serr = STDERR.clone
|
||||
STDOUT.reopen("/tmp/out.txt", "w+")
|
||||
STDERR.reopen("/tmp/err.txt", "w+")
|
||||
STDOUT.sync = true
|
||||
STDERR.sync = true
|
||||
yield
|
||||
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
||||
ensure
|
||||
STDOUT.reopen(sout)
|
||||
STDERR.reopen(serr)
|
||||
end
|
||||
|
||||
|
||||
def test_extended_protocol_pooler_errors
|
||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||
|
||||
conf_editor = ConfigEditor.new
|
||||
new_configs = conf_editor.original_configs
|
||||
|
||||
# shorter timeouts
|
||||
new_configs["general"]["connect_timeout"] = 500
|
||||
new_configs["general"]["ban_time"] = 1
|
||||
new_configs["general"]["shutdown_timeout"] = 1
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1
|
||||
|
||||
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
||||
|
||||
conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
|
||||
10.times do
|
||||
Thread.new do
|
||||
conn = PG::connect(conn_str)
|
||||
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
||||
ensure
|
||||
conn&.close
|
||||
end
|
||||
end
|
||||
|
||||
sleep(0.5)
|
||||
conn_under_test = PG::connect(conn_str)
|
||||
stdout, stderr = with_captured_stdout_stderr do
|
||||
5.times do |i|
|
||||
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
||||
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
|
||||
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
||||
puts "Pool checkout errors not breaking clients passed"
|
||||
ensure
|
||||
sleep 1
|
||||
admin_conn.async_exec("RELOAD") # Reset state
|
||||
conn_under_test&.close
|
||||
end
|
||||
|
||||
test_extended_protocol_pooler_errors
|
||||
|
||||
# Uncomment these two to see all queries.
|
||||
# ActiveRecord.verbose_query_logs = true
|
||||
@@ -111,3 +198,68 @@ begin
|
||||
rescue ActiveRecord::StatementInvalid
|
||||
puts 'OK'
|
||||
end
|
||||
|
||||
# Test evil clients
|
||||
def poorly_behaved_client
|
||||
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
conn.async_exec 'BEGIN'
|
||||
conn.async_exec 'SELECT 1'
|
||||
|
||||
conn.close
|
||||
puts 'Bad client ok'
|
||||
end
|
||||
|
||||
25.times do
|
||||
poorly_behaved_client
|
||||
end
|
||||
|
||||
|
||||
def test_server_parameters
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
raise StandardError, "Bad server version" if server_conn.server_version == 0
|
||||
server_conn.close
|
||||
|
||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||
raise StandardError, "Bad server version" if admin_conn.server_version == 0
|
||||
admin_conn.close
|
||||
|
||||
puts 'Server parameters ok'
|
||||
end
|
||||
|
||||
|
||||
def test_reload_pool_recycling
|
||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
|
||||
server_conn.async_exec("BEGIN")
|
||||
conf_editor = ConfigEditor.new
|
||||
new_configs = conf_editor.original_configs
|
||||
|
||||
# swap shards
|
||||
new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1"
|
||||
new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0"
|
||||
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||
server_conn.async_exec("COMMIT;")
|
||||
|
||||
# Transaction finished, client should get new configs
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||
server_conn.close()
|
||||
|
||||
# New connection should get new configs
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||
|
||||
ensure
|
||||
admin_conn.async_exec("RELOAD") # Go back to old state
|
||||
admin_conn.close
|
||||
server_conn.close
|
||||
puts "Pool Recycling okay!"
|
||||
end
|
||||
|
||||
test_reload_pool_recycling
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -70,35 +70,23 @@ GRANT CONNECT ON DATABASE shard2 TO other_user;
|
||||
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
||||
|
||||
\c shard0
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
\c shard1
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
|
||||
\c shard2
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
\c some_db
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user;
|
||||
GRANT ALL ON SCHEMA public TO simple_user;
|
||||
GRANT ALL ON TABLE data TO simple_user;
|
||||
|
||||
Reference in New Issue
Block a user