mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-24 17:56:29 +00:00
Compare commits
15 Commits
levkk-sear
...
levkk-dist
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb55802917 | ||
|
|
075167431d | ||
|
|
9514b3b2d1 | ||
|
|
6d41640ea9 | ||
|
|
744ceada86 | ||
|
|
a5c8dd69b2 | ||
|
|
6a9a4db648 | ||
|
|
976b406468 | ||
|
|
417358c35d | ||
|
|
23a642f4a4 | ||
|
|
7f20dc3054 | ||
|
|
36339bd96f | ||
|
|
65b69b46d2 | ||
|
|
d48c04a7fb | ||
|
|
2628dec42e |
@@ -15,14 +15,34 @@ jobs:
|
|||||||
RUSTFLAGS: "-C instrument-coverage"
|
RUSTFLAGS: "-C instrument-coverage"
|
||||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||||
- image: postgres:14
|
- image: postgres:14
|
||||||
# auth:
|
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||||
# username: mydockerhub-user
|
|
||||||
# password: $DOCKERHUB_PASSWORD
|
|
||||||
environment:
|
environment:
|
||||||
POSTGRES_USER: postgres
|
POSTGRES_USER: postgres
|
||||||
POSTGRES_DB: postgres
|
POSTGRES_DB: postgres
|
||||||
POSTGRES_PASSWORD: postgres
|
POSTGRES_PASSWORD: postgres
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
- image: postgres:14
|
||||||
|
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
- image: postgres:14
|
||||||
|
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
- image: postgres:14
|
||||||
|
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
|
||||||
# Add steps to the job
|
# Add steps to the job
|
||||||
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ function start_pgcat() {
|
|||||||
|
|
||||||
# Setup the database with shards and user
|
# Setup the database with shards and user
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
|
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||||
@@ -26,7 +29,7 @@ wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/downlo
|
|||||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
toxiproxy-server &
|
LOG_LEVEL=error toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
# Create a database at port 5433, forward it to Postgres
|
# Create a database at port 5433, forward it to Postgres
|
||||||
@@ -87,7 +90,8 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
|
|||||||
cd tests/ruby
|
cd tests/ruby
|
||||||
sudo gem install bundler
|
sudo gem install bundler
|
||||||
bundle install
|
bundle install
|
||||||
ruby tests.rb
|
bundle exec ruby tests.rb || exit 1
|
||||||
|
bundle exec rspec *_spec.rb || exit 1
|
||||||
cd ../..
|
cd ../..
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -95,7 +99,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py
|
python3 tests/python/tests.py || exit 1
|
||||||
|
|
||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
@@ -105,9 +109,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n
|
|||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||||
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||||
export PGPASSWORD=sharding_user
|
export PGPASSWORD=sharding_user
|
||||||
|
|||||||
25
README.md
25
README.md
@@ -1,8 +1,11 @@
|
|||||||
# PgCat
|

|
||||||
|
|
||||||
|
##### PgCat: PostgreSQL at petabyte scale
|
||||||
|
|
||||||
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
||||||
|
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
|
||||||

|
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
|
||||||
|
</a>
|
||||||
|
|
||||||
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
||||||
|
|
||||||
@@ -87,6 +90,14 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
|
|||||||
|
|
||||||
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||||
|
|
||||||
|
Run `cargo test` to run Rust tests.
|
||||||
|
|
||||||
|
Run the following commands to run Integration tests locally.
|
||||||
|
```
|
||||||
|
cd tests/docker/
|
||||||
|
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
|
||||||
|
```
|
||||||
|
|
||||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||||
@@ -447,7 +458,7 @@ Always good to have a base line.
|
|||||||
|
|
||||||
```
|
```
|
||||||
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||||
Password:
|
Password:
|
||||||
starting vacuum...end.
|
starting vacuum...end.
|
||||||
transaction type: <builtin: select only>
|
transaction type: <builtin: select only>
|
||||||
scaling factor: 1
|
scaling factor: 1
|
||||||
@@ -461,7 +472,7 @@ tps = 139443.955722 (including connections establishing)
|
|||||||
tps = 142314.859075 (excluding connections establishing)
|
tps = 142314.859075 (excluding connections establishing)
|
||||||
|
|
||||||
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||||
Password:
|
Password:
|
||||||
starting vacuum...end.
|
starting vacuum...end.
|
||||||
transaction type: <builtin: select only>
|
transaction type: <builtin: select only>
|
||||||
scaling factor: 1
|
scaling factor: 1
|
||||||
@@ -475,7 +486,7 @@ tps = 150644.840891 (including connections establishing)
|
|||||||
tps = 152218.499430 (excluding connections establishing)
|
tps = 152218.499430 (excluding connections establishing)
|
||||||
|
|
||||||
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||||
Password:
|
Password:
|
||||||
starting vacuum...end.
|
starting vacuum...end.
|
||||||
transaction type: <builtin: select only>
|
transaction type: <builtin: select only>
|
||||||
scaling factor: 1
|
scaling factor: 1
|
||||||
@@ -489,7 +500,7 @@ tps = 152517.663404 (including connections establishing)
|
|||||||
tps = 153319.188482 (excluding connections establishing)
|
tps = 153319.188482 (excluding connections establishing)
|
||||||
|
|
||||||
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||||
Password:
|
Password:
|
||||||
starting vacuum...end.
|
starting vacuum...end.
|
||||||
transaction type: <builtin: select only>
|
transaction type: <builtin: select only>
|
||||||
scaling factor: 1
|
scaling factor: 1
|
||||||
|
|||||||
390
src/client.rs
390
src/client.rs
@@ -59,6 +59,7 @@ pub struct Client<S, T> {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
/// Client parameters, e.g. user, client_encoding, etc.
|
/// Client parameters, e.g. user, client_encoding, etc.
|
||||||
|
#[allow(dead_code)]
|
||||||
parameters: HashMap<String, String>,
|
parameters: HashMap<String, String>,
|
||||||
|
|
||||||
/// Statistics
|
/// Statistics
|
||||||
@@ -82,8 +83,14 @@ pub struct Client<S, T> {
|
|||||||
/// Postgres user for this client (This comes from the user in the connection string)
|
/// Postgres user for this client (This comes from the user in the connection string)
|
||||||
username: String,
|
username: String,
|
||||||
|
|
||||||
|
/// Application name for this client (defaults to pgcat)
|
||||||
|
application_name: String,
|
||||||
|
|
||||||
/// Used to notify clients about an impending shutdown
|
/// Used to notify clients about an impending shutdown
|
||||||
shutdown: Receiver<()>,
|
shutdown: Receiver<()>,
|
||||||
|
|
||||||
|
// Sharding key column position
|
||||||
|
sharding_key_column: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Client entrypoint.
|
/// Client entrypoint.
|
||||||
@@ -91,7 +98,7 @@ pub async fn client_entrypoint(
|
|||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
shutdown: Receiver<()>,
|
shutdown: Receiver<()>,
|
||||||
drain: Sender<i8>,
|
drain: Sender<i32>,
|
||||||
admin_only: bool,
|
admin_only: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
// Figure out if the client wants TLS or not.
|
// Figure out if the client wants TLS or not.
|
||||||
@@ -200,7 +207,7 @@ pub async fn client_entrypoint(
|
|||||||
Ok(mut client) => {
|
Ok(mut client) => {
|
||||||
info!("Client {:?} connected (plain)", addr);
|
info!("Client {:?} connected (plain)", addr);
|
||||||
|
|
||||||
if client.is_admin() {
|
if !client.is_admin() {
|
||||||
let _ = drain.send(1).await;
|
let _ = drain.send(1).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -225,7 +232,7 @@ pub async fn client_entrypoint(
|
|||||||
Ok(mut client) => {
|
Ok(mut client) => {
|
||||||
info!("Client {:?} issued a cancel query request", addr);
|
info!("Client {:?} issued a cancel query request", addr);
|
||||||
|
|
||||||
if client.is_admin() {
|
if !client.is_admin() {
|
||||||
let _ = drain.send(1).await;
|
let _ = drain.send(1).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -365,6 +372,11 @@ where
|
|||||||
None => return Err(Error::ClientError),
|
None => return Err(Error::ClientError),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let application_name = match parameters.get("application_name") {
|
||||||
|
Some(application_name) => application_name,
|
||||||
|
None => "pgcat",
|
||||||
|
};
|
||||||
|
|
||||||
let admin = ["pgcat", "pgbouncer"]
|
let admin = ["pgcat", "pgbouncer"]
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|db| *db == &pool_name)
|
.filter(|db| *db == &pool_name)
|
||||||
@@ -493,8 +505,10 @@ where
|
|||||||
last_server_id: None,
|
last_server_id: None,
|
||||||
pool_name: pool_name.clone(),
|
pool_name: pool_name.clone(),
|
||||||
username: username.clone(),
|
username: username.clone(),
|
||||||
|
application_name: application_name.to_string(),
|
||||||
shutdown,
|
shutdown,
|
||||||
connected_to_server: false,
|
connected_to_server: false,
|
||||||
|
sharding_key_column: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -526,8 +540,10 @@ where
|
|||||||
last_server_id: None,
|
last_server_id: None,
|
||||||
pool_name: String::from("undefined"),
|
pool_name: String::from("undefined"),
|
||||||
username: String::from("undefined"),
|
username: String::from("undefined"),
|
||||||
|
application_name: String::from("undefined"),
|
||||||
shutdown,
|
shutdown,
|
||||||
connected_to_server: false,
|
connected_to_server: false,
|
||||||
|
sharding_key_column: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -600,10 +616,23 @@ where
|
|||||||
message_result = read_message(&mut self.read) => message_result?
|
message_result = read_message(&mut self.read) => message_result?
|
||||||
};
|
};
|
||||||
|
|
||||||
// Avoid taking a server if the client just wants to disconnect.
|
match message[0] as char {
|
||||||
if message[0] as char == 'X' {
|
// Buffer extended protocol messages even if we do not have
|
||||||
debug!("Client disconnecting");
|
// a server connection yet. Hopefully, when we get the S message
|
||||||
return Ok(());
|
// we'll be able to allocate a connection. Also, clients do not expect
|
||||||
|
// the server to respond to these messages so even if we were not able to
|
||||||
|
// allocate a connection, we wouldn't be able to send back an error message
|
||||||
|
// to the client so we buffer them and defer the decision to error out or not
|
||||||
|
// to when we get the S message
|
||||||
|
'P' | 'B' | 'D' | 'E' => {
|
||||||
|
self.buffer.put(&message[..]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
'X' => {
|
||||||
|
debug!("Client disconnecting");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle admin database queries.
|
// Handle admin database queries.
|
||||||
@@ -700,6 +729,13 @@ where
|
|||||||
show_response(&mut self.write, "primary reads", &value).await?;
|
show_response(&mut self.write, "primary reads", &value).await?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// COPY .. SHARDING_KEY_COLUMN ..
|
||||||
|
Some((Command::StartShardedCopy, value)) => {
|
||||||
|
custom_protocol_response_ok(&mut self.write, "SHARDED_COPY").await?;
|
||||||
|
self.sharding_key_column = Some(value.parse::<usize>().unwrap());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("Waiting for connection from pool");
|
debug!("Waiting for connection from pool");
|
||||||
@@ -714,22 +750,19 @@ where
|
|||||||
conn
|
conn
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
|
// Client is attempting to get results from the server,
|
||||||
// of extended protocol submission. So we will hold off on sending the actual error
|
// but we were unable to grab a connection from the pool
|
||||||
// message to the client until we get 'S' message
|
// We'll send back an error message and clean the extended
|
||||||
match message[0] as char {
|
// protocol buffer
|
||||||
'P' | 'B' | 'E' | 'D' => (),
|
if message[0] as char == 'S' {
|
||||||
_ => {
|
error!("Got Sync message but failed to get a connection from the pool");
|
||||||
error_response(
|
self.buffer.clear();
|
||||||
&mut self.write,
|
}
|
||||||
"could not get connection from the pool",
|
error_response(&mut self.write, "could not get connection from the pool")
|
||||||
)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
error!("Could not get connection from pool: {:?}", err);
|
|
||||||
|
|
||||||
|
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
||||||
|
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -759,13 +792,10 @@ where
|
|||||||
server.address()
|
server.address()
|
||||||
);
|
);
|
||||||
|
|
||||||
// Set application_name if any.
|
|
||||||
// TODO: investigate other parameters and set them too.
|
// TODO: investigate other parameters and set them too.
|
||||||
if self.parameters.contains_key("application_name") {
|
|
||||||
server
|
// Set application_name.
|
||||||
.set_name(&self.parameters["application_name"])
|
server.set_name(&self.application_name).await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transaction loop. Multiple queries can be issued by the client here.
|
// Transaction loop. Multiple queries can be issued by the client here.
|
||||||
// The connection belongs to the client until the transaction is over,
|
// The connection belongs to the client until the transaction is over,
|
||||||
@@ -774,7 +804,7 @@ where
|
|||||||
// If the client is in session mode, no more custom protocol
|
// If the client is in session mode, no more custom protocol
|
||||||
// commands will be accepted.
|
// commands will be accepted.
|
||||||
loop {
|
loop {
|
||||||
let mut message = if message.len() == 0 {
|
let message = if message.len() == 0 {
|
||||||
trace!("Waiting for message inside transaction or in session mode");
|
trace!("Waiting for message inside transaction or in session mode");
|
||||||
|
|
||||||
match read_message(&mut self.read).await {
|
match read_message(&mut self.read).await {
|
||||||
@@ -782,12 +812,7 @@ where
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Client disconnected inside a transaction.
|
// Client disconnected inside a transaction.
|
||||||
// Clean up the server and re-use it.
|
// Clean up the server and re-use it.
|
||||||
// This prevents connection thrashing by bad clients.
|
server.checkin_cleanup().await?;
|
||||||
if server.in_transaction() {
|
|
||||||
server.query("ROLLBACK").await?;
|
|
||||||
server.query("DISCARD ALL").await?;
|
|
||||||
server.set_name("pgcat").await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
@@ -798,157 +823,174 @@ where
|
|||||||
msg
|
msg
|
||||||
};
|
};
|
||||||
|
|
||||||
// The message will be forwarded to the server intact. We still would like to
|
match self.handle_message(&pool, server, &address, message).await? {
|
||||||
// parse it below to figure out what to do with it.
|
Some(done) => if done { break; },
|
||||||
let original = message.clone();
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
let code = message.get_u8() as char;
|
|
||||||
let _len = message.get_i32() as usize;
|
|
||||||
|
|
||||||
trace!("Message: {}", code);
|
|
||||||
|
|
||||||
match code {
|
|
||||||
// ReadyForQuery
|
|
||||||
'Q' => {
|
|
||||||
debug!("Sending query to server");
|
|
||||||
|
|
||||||
self.send_and_receive_loop(code, original, server, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
// Report transaction executed statistics.
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Terminate
|
|
||||||
'X' => {
|
|
||||||
// Client closing. Rollback and clean up
|
|
||||||
// connection before releasing into the pool.
|
|
||||||
// Pgbouncer closes the connection which leads to
|
|
||||||
// connection thrashing when clients misbehave.
|
|
||||||
if server.in_transaction() {
|
|
||||||
server.query("ROLLBACK").await?;
|
|
||||||
server.query("DISCARD ALL").await?;
|
|
||||||
server.set_name("pgcat").await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.release();
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse
|
|
||||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
|
||||||
'P' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bind
|
|
||||||
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
|
||||||
'B' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Describe
|
|
||||||
// Command a client can issue to describe a previously prepared named statement.
|
|
||||||
'D' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute
|
|
||||||
// Execute a prepared statement prepared in `P` and bound in `B`.
|
|
||||||
'E' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync
|
|
||||||
// Frontend (client) is asking for the query result now.
|
|
||||||
'S' => {
|
|
||||||
debug!("Sending query to server");
|
|
||||||
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
|
||||||
code,
|
|
||||||
self.buffer.clone(),
|
|
||||||
server,
|
|
||||||
&address,
|
|
||||||
&pool,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
self.buffer.clear();
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyData
|
|
||||||
'd' => {
|
|
||||||
// Forward the data to the server,
|
|
||||||
// don't buffer it since it can be rather large.
|
|
||||||
self.send_server_message(server, original, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyDone or CopyFail
|
|
||||||
// Copy is done, successfully or not.
|
|
||||||
'c' | 'f' => {
|
|
||||||
self.send_server_message(server, original, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let response = self.receive_server_message(server, &address, &pool).await?;
|
|
||||||
|
|
||||||
match write_all_half(&mut self.write, response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
server.mark_bad();
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Some unexpected message. We either did not implement the protocol correctly
|
|
||||||
// or this is not a Postgres client we're talking to.
|
|
||||||
_ => {
|
|
||||||
error!("Unexpected code: {}", code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
|
server.checkin_cleanup().await?;
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
self.connected_to_server = false;
|
self.connected_to_server = false;
|
||||||
|
|
||||||
self.release();
|
self.release();
|
||||||
self.stats.client_idle(self.process_id, address.id);
|
self.stats.client_idle(self.process_id, address.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_message(&mut self, pool: &ConnectionPool, server: &mut Server, address: &Address, mut message: BytesMut) -> Result<Option<bool>, Error> {
|
||||||
|
let original = message.clone();
|
||||||
|
let code = message.get_u8() as char;
|
||||||
|
let _len = message.get_i32() as usize;
|
||||||
|
|
||||||
|
trace!("Message: {}", code);
|
||||||
|
|
||||||
|
match code {
|
||||||
|
// ReadyForQuery
|
||||||
|
'Q' => {
|
||||||
|
debug!("Sending query to server");
|
||||||
|
|
||||||
|
self.send_and_receive_loop(code, original, server, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
// Report transaction executed statistics.
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
return Ok(Some(true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate
|
||||||
|
'X' => {
|
||||||
|
server.checkin_cleanup().await?;
|
||||||
|
self.release();
|
||||||
|
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse
|
||||||
|
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||||
|
'P' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bind
|
||||||
|
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
||||||
|
'B' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Describe
|
||||||
|
// Command a client can issue to describe a previously prepared named statement.
|
||||||
|
'D' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
// Execute a prepared statement prepared in `P` and bound in `B`.
|
||||||
|
'E' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync
|
||||||
|
// Frontend (client) is asking for the query result now.
|
||||||
|
'S' => {
|
||||||
|
debug!("Sending query to server");
|
||||||
|
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
|
||||||
|
// Clone after freeze does not allocate
|
||||||
|
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
||||||
|
|
||||||
|
// Almost certainly true
|
||||||
|
if first_message_code == 'P' {
|
||||||
|
// Message layout
|
||||||
|
// P followed by 32 int followed by null-terminated statement name
|
||||||
|
// So message code should be in offset 0 of the buffer, first character
|
||||||
|
// in prepared statement name would be index 5
|
||||||
|
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
|
||||||
|
if first_char_in_name != 0 {
|
||||||
|
// This is a named prepared statement
|
||||||
|
// Server connection state will need to be cleared at checkin
|
||||||
|
server.mark_dirty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.send_and_receive_loop(
|
||||||
|
code,
|
||||||
|
self.buffer.clone(),
|
||||||
|
server,
|
||||||
|
&address,
|
||||||
|
&pool,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.buffer.clear();
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
return Ok(Some(true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CopyData
|
||||||
|
'd' => {
|
||||||
|
// Forward the data to the server,
|
||||||
|
// don't buffer it since it can be rather large.
|
||||||
|
self.send_server_message(server, original, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// CopyDone or CopyFail
|
||||||
|
// Copy is done, successfully or not.
|
||||||
|
'c' | 'f' => {
|
||||||
|
self.send_server_message(server, original, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let response = self.receive_server_message(server, &address, &pool).await?;
|
||||||
|
|
||||||
|
match write_all_half(&mut self.write, response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
return Ok(Some(true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some unexpected message. We either did not implement the protocol correctly
|
||||||
|
// or this is not a Postgres client we're talking to.
|
||||||
|
_ => {
|
||||||
|
error!("Unexpected code: {}", code);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Some(false))
|
||||||
|
}
|
||||||
|
|
||||||
/// Release the server from the client: it can't cancel its queries anymore.
|
/// Release the server from the client: it can't cancel its queries anymore.
|
||||||
pub fn release(&self) {
|
pub fn release(&self) {
|
||||||
let mut guard = self.client_server_map.lock();
|
let mut guard = self.client_server_map.lock();
|
||||||
|
|||||||
82
src/main.rs
82
src/main.rs
@@ -74,7 +74,8 @@ use crate::stats::{Collector, Reporter, REPORTER};
|
|||||||
|
|
||||||
#[tokio::main(worker_threads = 4)]
|
#[tokio::main(worker_threads = 4)]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
env_logger::init();
|
env_logger::builder().format_timestamp_micros().init();
|
||||||
|
|
||||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||||
|
|
||||||
if !query_router::QueryRouter::setup() {
|
if !query_router::QueryRouter::setup() {
|
||||||
@@ -154,12 +155,31 @@ async fn main() {
|
|||||||
|
|
||||||
info!("Config autoreloader: {}", config.general.autoreload);
|
info!("Config autoreloader: {}", config.general.autoreload);
|
||||||
|
|
||||||
|
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||||
|
let autoreload_client_server_map = client_server_map.clone();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
autoreload_interval.tick().await;
|
||||||
|
if config.general.autoreload {
|
||||||
|
info!("Automatically reloading config");
|
||||||
|
|
||||||
|
match reload_config(autoreload_client_server_map.clone()).await {
|
||||||
|
Ok(changed) => {
|
||||||
|
if changed {
|
||||||
|
get_config().show()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
||||||
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
||||||
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
||||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
|
||||||
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
||||||
let (drain_tx, mut drain_rx) = mpsc::channel::<i8>(2048);
|
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
|
||||||
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
||||||
|
|
||||||
info!("Waiting for clients");
|
info!("Waiting for clients");
|
||||||
@@ -182,21 +202,6 @@ async fn main() {
|
|||||||
get_config().show();
|
get_config().show();
|
||||||
},
|
},
|
||||||
|
|
||||||
_ = autoreload_interval.tick() => {
|
|
||||||
if config.general.autoreload {
|
|
||||||
info!("Automatically reloading config");
|
|
||||||
|
|
||||||
match reload_config(client_server_map.clone()).await {
|
|
||||||
Ok(changed) => {
|
|
||||||
if changed {
|
|
||||||
get_config().show()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => (),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
// Initiate graceful shutdown sequence on sig int
|
// Initiate graceful shutdown sequence on sig int
|
||||||
_ = interrupt_signal.recv() => {
|
_ = interrupt_signal.recv() => {
|
||||||
info!("Got SIGINT, waiting for client connection drain now");
|
info!("Got SIGINT, waiting for client connection drain now");
|
||||||
@@ -217,13 +222,16 @@ async fn main() {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
// We're done waiting.
|
// We're done waiting.
|
||||||
error!("Timed out waiting for clients");
|
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
|
||||||
|
|
||||||
let _ = exit_tx.send(()).await;
|
let _ = exit_tx.send(()).await;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
_ = term_signal.recv() => break,
|
_ = term_signal.recv() => {
|
||||||
|
info!("Got SIGTERM, closing with {} clients active", total_clients);
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
|
||||||
new_client = listener.accept() => {
|
new_client = listener.accept() => {
|
||||||
let (socket, addr) = match new_client {
|
let (socket, addr) = match new_client {
|
||||||
@@ -300,34 +308,18 @@ async fn main() {
|
|||||||
///
|
///
|
||||||
/// * `duration` - A duration of time
|
/// * `duration` - A duration of time
|
||||||
fn format_duration(duration: &chrono::Duration) -> String {
|
fn format_duration(duration: &chrono::Duration) -> String {
|
||||||
let seconds = {
|
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
||||||
let seconds = duration.num_seconds() % 60;
|
|
||||||
if seconds < 10 {
|
|
||||||
format!("0{}", seconds)
|
|
||||||
} else {
|
|
||||||
format!("{}", seconds)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let minutes = {
|
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
||||||
let minutes = duration.num_minutes() % 60;
|
|
||||||
if minutes < 10 {
|
|
||||||
format!("0{}", minutes)
|
|
||||||
} else {
|
|
||||||
format!("{}", minutes)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let hours = {
|
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
||||||
let hours = duration.num_hours() % 24;
|
|
||||||
if hours < 10 {
|
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
||||||
format!("0{}", hours)
|
|
||||||
} else {
|
|
||||||
format!("{}", hours)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let days = duration.num_days().to_string();
|
let days = duration.num_days().to_string();
|
||||||
|
|
||||||
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
|
format!(
|
||||||
|
"{}d {}:{}:{}.{}",
|
||||||
|
days, hours, minutes, seconds, milliseconds
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -559,11 +559,7 @@ impl ManageConnection for ServerPool {
|
|||||||
|
|
||||||
/// Attempts to create a new connection.
|
/// Attempts to create a new connection.
|
||||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||||
info!(
|
info!("Creating a new server connection {:?}", self.address);
|
||||||
"Creating a new connection to {:?} using user {:?}",
|
|
||||||
self.address.name(),
|
|
||||||
self.user.username
|
|
||||||
);
|
|
||||||
|
|
||||||
// Put a temporary process_id into the stats
|
// Put a temporary process_id into the stats
|
||||||
// for server login.
|
// for server login.
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use crate::pool::PoolSettings;
|
|||||||
use crate::sharding::Sharder;
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
/// Regexes used to parse custom commands.
|
/// Regexes used to parse custom commands.
|
||||||
const CUSTOM_SQL_REGEXES: [&str; 7] = [
|
const CUSTOM_SQL_REGEXES: [&str; 8] = [
|
||||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||||
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
||||||
r"(?i)^ *SHOW SHARD *;? *$",
|
r"(?i)^ *SHOW SHARD *;? *$",
|
||||||
@@ -21,6 +21,7 @@ const CUSTOM_SQL_REGEXES: [&str; 7] = [
|
|||||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||||
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
|
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
|
||||||
r"(?i)^ *SHOW PRIMARY READS *;? *$",
|
r"(?i)^ *SHOW PRIMARY READS *;? *$",
|
||||||
|
r"(?i)^ *SHARDED_COPY '?([0-9]+)'? *;? *$",
|
||||||
];
|
];
|
||||||
|
|
||||||
/// Custom commands.
|
/// Custom commands.
|
||||||
@@ -33,6 +34,7 @@ pub enum Command {
|
|||||||
ShowServerRole,
|
ShowServerRole,
|
||||||
SetPrimaryReads,
|
SetPrimaryReads,
|
||||||
ShowPrimaryReads,
|
ShowPrimaryReads,
|
||||||
|
StartShardedCopy,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Quickly test for match when a query is received.
|
/// Quickly test for match when a query is received.
|
||||||
@@ -57,6 +59,9 @@ pub struct QueryRouter {
|
|||||||
|
|
||||||
/// Pool configuration.
|
/// Pool configuration.
|
||||||
pool_settings: PoolSettings,
|
pool_settings: PoolSettings,
|
||||||
|
|
||||||
|
// Sharding key column
|
||||||
|
sharding_key_column: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QueryRouter {
|
impl QueryRouter {
|
||||||
@@ -98,6 +103,7 @@ impl QueryRouter {
|
|||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
primary_reads_enabled: false,
|
primary_reads_enabled: false,
|
||||||
pool_settings: PoolSettings::default(),
|
pool_settings: PoolSettings::default(),
|
||||||
|
sharding_key_column: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,6 +151,7 @@ impl QueryRouter {
|
|||||||
4 => Command::ShowServerRole,
|
4 => Command::ShowServerRole,
|
||||||
5 => Command::SetPrimaryReads,
|
5 => Command::SetPrimaryReads,
|
||||||
6 => Command::ShowPrimaryReads,
|
6 => Command::ShowPrimaryReads,
|
||||||
|
7 => Command::StartShardedCopy,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -152,7 +159,8 @@ impl QueryRouter {
|
|||||||
Command::SetShardingKey
|
Command::SetShardingKey
|
||||||
| Command::SetShard
|
| Command::SetShard
|
||||||
| Command::SetServerRole
|
| Command::SetServerRole
|
||||||
| Command::SetPrimaryReads => {
|
| Command::SetPrimaryReads
|
||||||
|
| Command::StartShardedCopy => {
|
||||||
// Capture value. I know this re-runs the regex engine, but I haven't
|
// Capture value. I know this re-runs the regex engine, but I haven't
|
||||||
// figured out a better way just yet. I think I can write a single Regex
|
// figured out a better way just yet. I think I can write a single Regex
|
||||||
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
||||||
@@ -204,6 +212,13 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Command::StartShardedCopy => {
|
||||||
|
self.sharding_key_column = match value.parse::<usize>() {
|
||||||
|
Ok(value) => Some(value),
|
||||||
|
Err(_) => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Command::SetServerRole => {
|
Command::SetServerRole => {
|
||||||
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
||||||
"primary" => {
|
"primary" => {
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
/// Implementation of the PostgreSQL server (database) protocol.
|
/// Implementation of the PostgreSQL server (database) protocol.
|
||||||
/// Here we are pretending to the a Postgres client.
|
/// Here we are pretending to the a Postgres client.
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use log::{debug, error, info, trace};
|
use log::{debug, error, info, trace, warn};
|
||||||
|
use std::io::Read;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::{AsyncReadExt, BufReader};
|
use tokio::io::{AsyncReadExt, BufReader};
|
||||||
use tokio::net::{
|
use tokio::net::{
|
||||||
@@ -48,6 +49,9 @@ pub struct Server {
|
|||||||
/// Is the server broken? We'll remote it from the pool if so.
|
/// Is the server broken? We'll remote it from the pool if so.
|
||||||
bad: bool,
|
bad: bool,
|
||||||
|
|
||||||
|
/// If server connection requires a DISCARD ALL before checkin
|
||||||
|
needs_cleanup: bool,
|
||||||
|
|
||||||
/// Mapping of clients and servers used for query cancellation.
|
/// Mapping of clients and servers used for query cancellation.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
@@ -316,6 +320,7 @@ impl Server {
|
|||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
|
needs_cleanup: false,
|
||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
stats: stats,
|
stats: stats,
|
||||||
@@ -440,6 +445,29 @@ impl Server {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CommandComplete
|
||||||
|
'C' => {
|
||||||
|
let mut command_tag = String::new();
|
||||||
|
match message.reader().read_to_string(&mut command_tag) {
|
||||||
|
Ok(_) => {
|
||||||
|
// Non-exhaustive list of commands that are likely to change session variables/resources
|
||||||
|
// which can leak between clients. This is a best effort to block bad clients
|
||||||
|
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||||
|
match command_tag.as_str() {
|
||||||
|
"SET\0" | "PREPARE\0" => {
|
||||||
|
debug!("Server connection marked for clean up");
|
||||||
|
self.needs_cleanup = true;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Encountered an error while parsing CommandTag {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// DataRow
|
// DataRow
|
||||||
'D' => {
|
'D' => {
|
||||||
// More data is available after this message, this is not the end of the reply.
|
// More data is available after this message, this is not the end of the reply.
|
||||||
@@ -553,14 +581,43 @@ impl Server {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Perform any necessary cleanup before putting the server
|
||||||
|
/// connection back in the pool
|
||||||
|
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
|
||||||
|
// Client disconnected with an open transaction on the server connection.
|
||||||
|
// Pgbouncer behavior is to close the server connection but that can cause
|
||||||
|
// server connection thrashing if clients repeatedly do this.
|
||||||
|
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
||||||
|
if self.in_transaction() {
|
||||||
|
self.query("ROLLBACK").await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client disconnected but it perfromed session-altering operations such as
|
||||||
|
// SET statement_timeout to 1 or create a prepared statement. We clear that
|
||||||
|
// to avoid leaking state between clients. For performance reasons we only
|
||||||
|
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||||
|
// it before each checkin.
|
||||||
|
if self.needs_cleanup {
|
||||||
|
self.query("DISCARD ALL").await?;
|
||||||
|
self.needs_cleanup = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
/// A shorthand for `SET application_name = $1`.
|
/// A shorthand for `SET application_name = $1`.
|
||||||
#[allow(dead_code)]
|
|
||||||
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
||||||
if self.application_name != name {
|
if self.application_name != name {
|
||||||
self.application_name = name.to_string();
|
self.application_name = name.to_string();
|
||||||
Ok(self
|
// We don't want `SET application_name` to mark the server connection
|
||||||
|
// as needing cleanup
|
||||||
|
let needs_cleanup_before = self.needs_cleanup;
|
||||||
|
|
||||||
|
let result = Ok(self
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
.await?)
|
.await?);
|
||||||
|
self.needs_cleanup = needs_cleanup_before;
|
||||||
|
return result;
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -581,6 +638,11 @@ impl Server {
|
|||||||
pub fn last_activity(&self) -> SystemTime {
|
pub fn last_activity(&self) -> SystemTime {
|
||||||
self.last_activity
|
self.last_activity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Marks a connection as needing DISCARD ALL at checkin
|
||||||
|
pub fn mark_dirty(&mut self) {
|
||||||
|
self.needs_cleanup = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Server {
|
impl Drop for Server {
|
||||||
@@ -607,7 +669,8 @@ impl Drop for Server {
|
|||||||
let duration = now - self.connected_at;
|
let duration = now - self.connected_at;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Server connection closed, session duration: {}",
|
"Server connection closed {:?}, session duration: {}",
|
||||||
|
self.address,
|
||||||
crate::format_duration(&duration)
|
crate::format_duration(&duration)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
5
tests/docker/Dockerfile
Normal file
5
tests/docker/Dockerfile
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
FROM rust:bullseye
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
|
||||||
|
RUN cargo install cargo-binutils rustfilt
|
||||||
|
RUN rustup component add llvm-tools-preview
|
||||||
47
tests/docker/docker-compose.yml
Normal file
47
tests/docker/docker-compose.yml
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
version: "3"
|
||||||
|
services:
|
||||||
|
pg1:
|
||||||
|
image: postgres:14
|
||||||
|
network_mode: "service:main"
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
|
||||||
|
pg2:
|
||||||
|
image: postgres:14
|
||||||
|
network_mode: "service:main"
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
|
||||||
|
pg3:
|
||||||
|
image: postgres:14
|
||||||
|
network_mode: "service:main"
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
|
||||||
|
pg4:
|
||||||
|
image: postgres:14
|
||||||
|
network_mode: "service:main"
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
|
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
|
||||||
|
main:
|
||||||
|
build: .
|
||||||
|
command: ["bash", "/app/tests/docker/run.sh"]
|
||||||
|
environment:
|
||||||
|
RUSTFLAGS: "-C instrument-coverage"
|
||||||
|
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||||
|
volumes:
|
||||||
|
- ../../:/app/
|
||||||
|
- /app/target/
|
||||||
21
tests/docker/run.sh
Normal file
21
tests/docker/run.sh
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
rm /app/*.profraw || true
|
||||||
|
rm /app/pgcat.profdata || true
|
||||||
|
rm -rf /app/cov || true
|
||||||
|
|
||||||
|
cd /app/
|
||||||
|
|
||||||
|
cargo build
|
||||||
|
cargo test --tests
|
||||||
|
|
||||||
|
bash .circleci/run_tests.sh
|
||||||
|
|
||||||
|
rust-profdata merge -sparse pgcat-*.profraw -o pgcat.profdata
|
||||||
|
|
||||||
|
rust-cov export -ignore-filename-regex="rustc|registry" -Xdemangler=rustfilt -instr-profile=pgcat.profdata --object ./target/debug/pgcat --format lcov > ./lcov.info
|
||||||
|
|
||||||
|
genhtml lcov.info --output-directory cov --prefix $(pwd)
|
||||||
|
|
||||||
|
rm /app/*.profraw
|
||||||
|
rm /app/pgcat.profdata
|
||||||
@@ -18,9 +18,14 @@ def pgcat_start():
|
|||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
try:
|
||||||
if "pgcat" == proc.name():
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
os.kill(proc.pid, signal)
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
if signal == signal.SIGTERM:
|
||||||
# Returns 0 if pgcat process exists
|
# Returns 0 if pgcat process exists
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
source "https://rubygems.org"
|
source "https://rubygems.org"
|
||||||
|
|
||||||
gem "pg"
|
gem "pg"
|
||||||
gem "activerecord"
|
gem "toml"
|
||||||
|
gem "rspec"
|
||||||
gem "rubocop"
|
gem "rubocop"
|
||||||
gem "toml", "~> 0.3.0"
|
gem "toxiproxy"
|
||||||
|
gem "activerecord"
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ GEM
|
|||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
concurrent-ruby (1.1.10)
|
concurrent-ruby (1.1.10)
|
||||||
|
diff-lcs (1.5.0)
|
||||||
i18n (1.11.0)
|
i18n (1.11.0)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
minitest (5.16.2)
|
minitest (5.16.2)
|
||||||
@@ -24,6 +25,19 @@ GEM
|
|||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
regexp_parser (2.3.1)
|
regexp_parser (2.3.1)
|
||||||
rexml (3.2.5)
|
rexml (3.2.5)
|
||||||
|
rspec (3.11.0)
|
||||||
|
rspec-core (~> 3.11.0)
|
||||||
|
rspec-expectations (~> 3.11.0)
|
||||||
|
rspec-mocks (~> 3.11.0)
|
||||||
|
rspec-core (3.11.0)
|
||||||
|
rspec-support (~> 3.11.0)
|
||||||
|
rspec-expectations (3.11.0)
|
||||||
|
diff-lcs (>= 1.2.0, < 2.0)
|
||||||
|
rspec-support (~> 3.11.0)
|
||||||
|
rspec-mocks (3.11.1)
|
||||||
|
diff-lcs (>= 1.2.0, < 2.0)
|
||||||
|
rspec-support (~> 3.11.0)
|
||||||
|
rspec-support (3.11.0)
|
||||||
rubocop (1.29.0)
|
rubocop (1.29.0)
|
||||||
parallel (~> 1.10)
|
parallel (~> 1.10)
|
||||||
parser (>= 3.1.0.0)
|
parser (>= 3.1.0.0)
|
||||||
@@ -38,19 +52,23 @@ GEM
|
|||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
|
toxiproxy (2.0.1)
|
||||||
tzinfo (2.0.4)
|
tzinfo (2.0.4)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.1.0)
|
unicode-display_width (2.1.0)
|
||||||
|
|
||||||
PLATFORMS
|
PLATFORMS
|
||||||
|
aarch64-linux
|
||||||
arm64-darwin-21
|
arm64-darwin-21
|
||||||
x86_64-linux
|
x86_64-linux
|
||||||
|
|
||||||
DEPENDENCIES
|
DEPENDENCIES
|
||||||
activerecord
|
activerecord
|
||||||
pg
|
pg
|
||||||
|
rspec
|
||||||
rubocop
|
rubocop
|
||||||
toml (~> 0.3.0)
|
toml
|
||||||
|
toxiproxy
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
2.3.7
|
2.3.21
|
||||||
|
|||||||
82
tests/ruby/helpers/pg_instance.rb
Normal file
82
tests/ruby/helpers/pg_instance.rb
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
require 'pg'
|
||||||
|
require 'toxiproxy'
|
||||||
|
|
||||||
|
class PgInstance
|
||||||
|
attr_reader :port
|
||||||
|
attr_reader :username
|
||||||
|
attr_reader :password
|
||||||
|
attr_reader :database_name
|
||||||
|
|
||||||
|
def initialize(port, username, password, database_name)
|
||||||
|
@original_port = port
|
||||||
|
@toxiproxy_port = 10000 + port.to_i
|
||||||
|
@port = @toxiproxy_port
|
||||||
|
|
||||||
|
@username = username
|
||||||
|
@password = password
|
||||||
|
@database_name = database_name
|
||||||
|
@toxiproxy_name = "database_#{@original_port}"
|
||||||
|
Toxiproxy.populate([{
|
||||||
|
name: @toxiproxy_name,
|
||||||
|
listen: "0.0.0.0:#{@toxiproxy_port}",
|
||||||
|
upstream: "localhost:#{@original_port}",
|
||||||
|
}])
|
||||||
|
|
||||||
|
# Toxiproxy server will outlive our PgInstance objects
|
||||||
|
# so we want to destroy our proxies before exiting
|
||||||
|
# Ruby finalizer is ideal for doing this
|
||||||
|
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
|
||||||
|
end
|
||||||
|
|
||||||
|
def with_connection
|
||||||
|
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
|
||||||
|
yield conn
|
||||||
|
ensure
|
||||||
|
conn&.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def reset
|
||||||
|
reset_toxics
|
||||||
|
reset_stats
|
||||||
|
end
|
||||||
|
|
||||||
|
def toxiproxy
|
||||||
|
Toxiproxy[@toxiproxy_name]
|
||||||
|
end
|
||||||
|
|
||||||
|
def take_down
|
||||||
|
if block_given?
|
||||||
|
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
|
||||||
|
else
|
||||||
|
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_latency(latency)
|
||||||
|
if block_given?
|
||||||
|
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
|
||||||
|
else
|
||||||
|
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def delete_proxy
|
||||||
|
Toxiproxy[@toxiproxy_name].delete
|
||||||
|
end
|
||||||
|
|
||||||
|
def reset_toxics
|
||||||
|
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
|
||||||
|
end
|
||||||
|
|
||||||
|
def reset_stats
|
||||||
|
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
|
||||||
|
end
|
||||||
|
|
||||||
|
def count_query(query)
|
||||||
|
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
|
||||||
|
end
|
||||||
|
|
||||||
|
def count_select_1_plus_2
|
||||||
|
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
|
||||||
|
end
|
||||||
|
end
|
||||||
100
tests/ruby/helpers/pgcat_helper.rb
Normal file
100
tests/ruby/helpers/pgcat_helper.rb
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
require 'json'
|
||||||
|
require 'ostruct'
|
||||||
|
require_relative 'pgcat_process'
|
||||||
|
require_relative 'pg_instance'
|
||||||
|
|
||||||
|
module Helpers
|
||||||
|
module Pgcat
|
||||||
|
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||||
|
user = {
|
||||||
|
"password" => "sharding_user",
|
||||||
|
"pool_size" => pool_size,
|
||||||
|
"statement_timeout" => 0,
|
||||||
|
"username" => "sharding_user"
|
||||||
|
}
|
||||||
|
|
||||||
|
pgcat = PgcatProcess.new("info")
|
||||||
|
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||||
|
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
|
||||||
|
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
||||||
|
|
||||||
|
pgcat_cfg = pgcat.current_config
|
||||||
|
pgcat_cfg["pools"] = {
|
||||||
|
"#{pool_name}" => {
|
||||||
|
"default_role" => "any",
|
||||||
|
"pool_mode" => pool_mode,
|
||||||
|
"primary_reads_enabled" => false,
|
||||||
|
"query_parser_enabled" => false,
|
||||||
|
"sharding_function" => "pg_bigint_hash",
|
||||||
|
"shards" => {
|
||||||
|
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
|
||||||
|
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||||
|
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||||
|
},
|
||||||
|
"users" => { "0" => user }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pgcat.update_config(pgcat_cfg)
|
||||||
|
|
||||||
|
pgcat.start
|
||||||
|
pgcat.wait_until_ready
|
||||||
|
|
||||||
|
OpenStruct.new.tap do |struct|
|
||||||
|
struct.pgcat = pgcat
|
||||||
|
struct.shards = [primary0, primary1, primary2]
|
||||||
|
struct.all_databases = [primary0, primary1, primary2]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||||
|
user = {
|
||||||
|
"password" => "sharding_user",
|
||||||
|
"pool_size" => pool_size,
|
||||||
|
"statement_timeout" => 0,
|
||||||
|
"username" => "sharding_user"
|
||||||
|
}
|
||||||
|
|
||||||
|
pgcat = PgcatProcess.new("info")
|
||||||
|
pgcat_cfg = pgcat.current_config
|
||||||
|
|
||||||
|
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||||
|
replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
|
||||||
|
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
||||||
|
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
||||||
|
|
||||||
|
# Main proxy configs
|
||||||
|
pgcat_cfg["pools"] = {
|
||||||
|
"#{pool_name}" => {
|
||||||
|
"default_role" => "any",
|
||||||
|
"pool_mode" => pool_mode,
|
||||||
|
"primary_reads_enabled" => false,
|
||||||
|
"query_parser_enabled" => false,
|
||||||
|
"sharding_function" => "pg_bigint_hash",
|
||||||
|
"shards" => {
|
||||||
|
"0" => {
|
||||||
|
"database" => "shard0",
|
||||||
|
"servers" => [
|
||||||
|
["localhost", primary.port.to_s, "primary"],
|
||||||
|
["localhost", replica0.port.to_s, "replica"],
|
||||||
|
["localhost", replica1.port.to_s, "replica"],
|
||||||
|
["localhost", replica2.port.to_s, "replica"]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"users" => { "0" => user }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pgcat_cfg["general"]["port"] = pgcat.port
|
||||||
|
pgcat.update_config(pgcat_cfg)
|
||||||
|
pgcat.start
|
||||||
|
pgcat.wait_until_ready
|
||||||
|
|
||||||
|
OpenStruct.new.tap do |struct|
|
||||||
|
struct.pgcat = pgcat
|
||||||
|
struct.primary = primary
|
||||||
|
struct.replicas = [replica0, replica1, replica2]
|
||||||
|
struct.all_databases = [primary, replica0, replica1, replica2]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
116
tests/ruby/helpers/pgcat_process.rb
Normal file
116
tests/ruby/helpers/pgcat_process.rb
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
require 'pg'
|
||||||
|
require 'toml'
|
||||||
|
require 'fileutils'
|
||||||
|
require 'securerandom'
|
||||||
|
|
||||||
|
class PgcatProcess
|
||||||
|
attr_reader :port
|
||||||
|
attr_reader :pid
|
||||||
|
|
||||||
|
def self.finalize(pid, log_filename, config_filename)
|
||||||
|
`kill #{pid}`
|
||||||
|
File.delete(config_filename) if File.exist?(config_filename)
|
||||||
|
File.delete(log_filename) if File.exist?(log_filename)
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(log_level)
|
||||||
|
@env = {"RUST_LOG" => log_level}
|
||||||
|
@port = rand(20000..32760)
|
||||||
|
@log_level = log_level
|
||||||
|
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
|
||||||
|
@config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml"
|
||||||
|
|
||||||
|
@command = "../../target/debug/pgcat #{@config_filename}"
|
||||||
|
|
||||||
|
FileUtils.cp("../../pgcat.toml", @config_filename)
|
||||||
|
cfg = current_config
|
||||||
|
cfg["general"]["port"] = @port.to_i
|
||||||
|
cfg["general"]["enable_prometheus_exporter"] = false
|
||||||
|
|
||||||
|
update_config(cfg)
|
||||||
|
end
|
||||||
|
|
||||||
|
def logs
|
||||||
|
File.read(@log_filename)
|
||||||
|
end
|
||||||
|
|
||||||
|
def update_config(config_hash)
|
||||||
|
@original_config = current_config
|
||||||
|
output_to_write = TOML::Generator.new(config_hash).body
|
||||||
|
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
|
||||||
|
File.write(@config_filename, output_to_write)
|
||||||
|
end
|
||||||
|
|
||||||
|
def current_config
|
||||||
|
old_cfg = File.read(@config_filename)
|
||||||
|
loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",')
|
||||||
|
TOML.load(loadable_string)
|
||||||
|
end
|
||||||
|
|
||||||
|
def reload_config
|
||||||
|
`kill -s HUP #{@pid}`
|
||||||
|
sleep 0.1
|
||||||
|
end
|
||||||
|
|
||||||
|
def start
|
||||||
|
raise StandardError, "Process is already started" unless @pid.nil?
|
||||||
|
@pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename)
|
||||||
|
ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) })
|
||||||
|
|
||||||
|
return self
|
||||||
|
end
|
||||||
|
|
||||||
|
def wait_until_ready
|
||||||
|
exc = nil
|
||||||
|
10.times do
|
||||||
|
PG::connect(example_connection_string).close
|
||||||
|
|
||||||
|
return self
|
||||||
|
rescue => e
|
||||||
|
exc = e
|
||||||
|
sleep(0.5)
|
||||||
|
end
|
||||||
|
puts exc
|
||||||
|
raise StandardError, "Process #{@pid} never became ready. Logs #{logs}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
`kill #{@pid}`
|
||||||
|
sleep 0.1
|
||||||
|
end
|
||||||
|
|
||||||
|
def shutdown
|
||||||
|
stop
|
||||||
|
File.delete(@config_filename) if File.exist?(@config_filename)
|
||||||
|
File.delete(@log_filename) if File.exist?(@log_filename)
|
||||||
|
end
|
||||||
|
|
||||||
|
def admin_connection_string
|
||||||
|
cfg = current_config
|
||||||
|
username = cfg["general"]["admin_username"]
|
||||||
|
password = cfg["general"]["admin_password"]
|
||||||
|
|
||||||
|
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
|
||||||
|
end
|
||||||
|
|
||||||
|
def connection_string(pool_name, username)
|
||||||
|
cfg = current_config
|
||||||
|
|
||||||
|
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
|
||||||
|
password = user_obj["password"]
|
||||||
|
|
||||||
|
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def example_connection_string
|
||||||
|
cfg = current_config
|
||||||
|
first_pool_name = cfg["pools"].keys[0]
|
||||||
|
|
||||||
|
db_name = first_pool_name
|
||||||
|
|
||||||
|
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
||||||
|
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
||||||
|
|
||||||
|
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
||||||
|
end
|
||||||
|
end
|
||||||
61
tests/ruby/load_balancing_spec.rb
Normal file
61
tests/ruby/load_balancing_spec.rb
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
require_relative 'spec_helper'
|
||||||
|
|
||||||
|
describe "Load Balancing" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||||
|
after do
|
||||||
|
processes.all_databases.map(&:reset)
|
||||||
|
processes.pgcat.shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
context "under regular circumstances" do
|
||||||
|
it "balances query volume between all instances" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
|
||||||
|
query_count = QUERY_COUNT
|
||||||
|
expected_share = query_count / processes.all_databases.count
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
query_count.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(0)
|
||||||
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when some replicas are down" do
|
||||||
|
it "balances query volume between working instances" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
processes[:replicas][0].take_down do
|
||||||
|
processes[:replicas][1].take_down do
|
||||||
|
QUERY_COUNT.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(2)
|
||||||
|
processes.all_databases.each do |instance|
|
||||||
|
queries_routed = instance.count_select_1_plus_2
|
||||||
|
if processes.replicas[0..1].include?(instance)
|
||||||
|
expect(queries_routed).to eq(0)
|
||||||
|
else
|
||||||
|
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
193
tests/ruby/misc_spec.rb
Normal file
193
tests/ruby/misc_spec.rb
Normal file
@@ -0,0 +1,193 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
require_relative 'spec_helper'
|
||||||
|
|
||||||
|
describe "Miscellaneous" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||||
|
after do
|
||||||
|
processes.all_databases.map(&:reset)
|
||||||
|
processes.pgcat.shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "Extended Protocol handling" do
|
||||||
|
it "does not send packets that client does not expect during extended protocol sequence" do
|
||||||
|
new_configs = processes.pgcat.current_config
|
||||||
|
|
||||||
|
new_configs["general"]["connect_timeout"] = 500
|
||||||
|
new_configs["general"]["ban_time"] = 1
|
||||||
|
new_configs["general"]["shutdown_timeout"] = 1
|
||||||
|
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||||
|
|
||||||
|
processes.pgcat.update_config(new_configs)
|
||||||
|
processes.pgcat.reload_config
|
||||||
|
|
||||||
|
25.times do
|
||||||
|
Thread.new do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
||||||
|
ensure
|
||||||
|
conn&.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(0.5)
|
||||||
|
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
stdout, stderr = with_captured_stdout_stderr do
|
||||||
|
15.times do |i|
|
||||||
|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
||||||
|
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
||||||
|
sleep 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "Pool recycling after config reload" do
|
||||||
|
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
||||||
|
|
||||||
|
it "should update pools for new clients and clients that are no longer in transaction" do
|
||||||
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
server_conn.async_exec("BEGIN")
|
||||||
|
|
||||||
|
# No config change yet, client should set old configs
|
||||||
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||||
|
expect(current_datebase_from_pg).to eq('shard0')
|
||||||
|
|
||||||
|
# Swap shards
|
||||||
|
new_config = processes.pgcat.current_config
|
||||||
|
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
|
||||||
|
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
|
||||||
|
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
|
||||||
|
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
|
||||||
|
|
||||||
|
# Reload config
|
||||||
|
processes.pgcat.update_config(new_config)
|
||||||
|
processes.pgcat.reload_config
|
||||||
|
sleep 0.5
|
||||||
|
|
||||||
|
# Config changed but transaction is in progress, client should set old configs
|
||||||
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||||
|
expect(current_datebase_from_pg).to eq('shard0')
|
||||||
|
server_conn.async_exec("COMMIT")
|
||||||
|
|
||||||
|
# Transaction finished, client should get new configs
|
||||||
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||||
|
expect(current_datebase_from_pg).to eq('shard1')
|
||||||
|
|
||||||
|
# New connection should get new configs
|
||||||
|
server_conn.close()
|
||||||
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||||
|
expect(current_datebase_from_pg).to eq('shard1')
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "Clients closing connection in the middle of transaction" do
|
||||||
|
it "sends a rollback to the server" do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("BEGIN")
|
||||||
|
conn.close
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "Server version reporting" do
|
||||||
|
it "reports correct version for normal and admin databases" do
|
||||||
|
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
expect(server_conn.server_version).not_to eq(0)
|
||||||
|
server_conn.close
|
||||||
|
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
expect(admin_conn.server_version).not_to eq(0)
|
||||||
|
admin_conn.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "State clearance" do
|
||||||
|
context "session mode" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
|
||||||
|
|
||||||
|
it "Clears state before connection checkin" do
|
||||||
|
# Both modes of operation should not raise
|
||||||
|
# ERROR: prepared statement "prepared_q" already exists
|
||||||
|
15.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||||
|
conn.async_exec("SET statement_timeout to 1000")
|
||||||
|
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||||
|
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
it "Does not send DISCARD ALL unless necessary" do
|
||||||
|
10.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("SELECT 1")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("SELECT 1")
|
||||||
|
conn.async_exec("SET statement_timeout to 5000")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "transaction mode" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
||||||
|
it "Clears state before connection checkin" do
|
||||||
|
# Both modes of operation should not raise
|
||||||
|
# ERROR: prepared statement "prepared_q" already exists
|
||||||
|
15.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
15.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.prepare("prepared_q", "SELECT $1")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it "Does not send DISCARD ALL unless necessary" do
|
||||||
|
10.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("SELECT 1")
|
||||||
|
conn.exec_params("SELECT $1", [1])
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
conn.async_exec("SELECT 1")
|
||||||
|
conn.async_exec("SET statement_timeout to 5000")
|
||||||
|
conn.close
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
81
tests/ruby/routing_spec.rb
Normal file
81
tests/ruby/routing_spec.rb
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
require_relative 'spec_helper'
|
||||||
|
|
||||||
|
|
||||||
|
describe "Routing" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||||
|
after do
|
||||||
|
processes.all_databases.map(&:reset)
|
||||||
|
processes.pgcat.shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "SET ROLE" do
|
||||||
|
context "primary" do
|
||||||
|
it "routes queries only to primary" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||||
|
|
||||||
|
query_count = 30
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
query_count.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(0)
|
||||||
|
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
|
expect(instance_share).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_select_1_plus_2).to eq(query_count)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
context "replica" do
|
||||||
|
it "routes queries only to replicas" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'replica'")
|
||||||
|
|
||||||
|
expected_share = QUERY_COUNT / processes.replicas.count
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
QUERY_COUNT.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(0)
|
||||||
|
|
||||||
|
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(processes.primary.count_select_1_plus_2).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "any" do
|
||||||
|
it "routes queries to all instances" do
|
||||||
|
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||||
|
conn.async_exec("SET SERVER ROLE to 'any'")
|
||||||
|
|
||||||
|
expected_share = QUERY_COUNT / processes.all_databases.count
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
QUERY_COUNT.times do
|
||||||
|
conn.async_exec("SELECT 1 + 2")
|
||||||
|
rescue
|
||||||
|
failed_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(failed_count).to eq(0)
|
||||||
|
|
||||||
|
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||||
|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
21
tests/ruby/spec_helper.rb
Normal file
21
tests/ruby/spec_helper.rb
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'pg'
|
||||||
|
require_relative 'helpers/pgcat_helper'
|
||||||
|
|
||||||
|
QUERY_COUNT = 300
|
||||||
|
MARGIN_OF_ERROR = 0.30
|
||||||
|
|
||||||
|
def with_captured_stdout_stderr
|
||||||
|
sout = STDOUT.clone
|
||||||
|
serr = STDERR.clone
|
||||||
|
STDOUT.reopen("/tmp/out.txt", "w+")
|
||||||
|
STDERR.reopen("/tmp/err.txt", "w+")
|
||||||
|
STDOUT.sync = true
|
||||||
|
STDERR.sync = true
|
||||||
|
yield
|
||||||
|
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
||||||
|
ensure
|
||||||
|
STDOUT.reopen(sout)
|
||||||
|
STDERR.reopen(serr)
|
||||||
|
end
|
||||||
@@ -1,93 +1,6 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
require 'active_record'
|
|
||||||
require 'pg'
|
require 'pg'
|
||||||
require 'toml'
|
require 'active_record'
|
||||||
|
|
||||||
$stdout.sync = true
|
|
||||||
$stderr.sync = true
|
|
||||||
|
|
||||||
class ConfigEditor
|
|
||||||
def initialize
|
|
||||||
@original_config_text = File.read('../../.circleci/pgcat.toml')
|
|
||||||
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
|
|
||||||
|
|
||||||
@original_configs = TOML.load(text_to_load)
|
|
||||||
end
|
|
||||||
|
|
||||||
def original_configs
|
|
||||||
TOML.load(TOML::Generator.new(@original_configs).body)
|
|
||||||
end
|
|
||||||
|
|
||||||
def with_modified_configs(new_configs)
|
|
||||||
text_to_write = TOML::Generator.new(new_configs).body
|
|
||||||
text_to_write = text_to_write.gsub("\"5432\"", "5432")
|
|
||||||
File.write('../../.circleci/pgcat.toml', text_to_write)
|
|
||||||
yield
|
|
||||||
ensure
|
|
||||||
File.write('../../.circleci/pgcat.toml', @original_config_text)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def with_captured_stdout_stderr
|
|
||||||
sout = STDOUT.clone
|
|
||||||
serr = STDERR.clone
|
|
||||||
STDOUT.reopen("/tmp/out.txt", "w+")
|
|
||||||
STDERR.reopen("/tmp/err.txt", "w+")
|
|
||||||
STDOUT.sync = true
|
|
||||||
STDERR.sync = true
|
|
||||||
yield
|
|
||||||
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
|
||||||
ensure
|
|
||||||
STDOUT.reopen(sout)
|
|
||||||
STDERR.reopen(serr)
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
def test_extended_protocol_pooler_errors
|
|
||||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
|
||||||
|
|
||||||
conf_editor = ConfigEditor.new
|
|
||||||
new_configs = conf_editor.original_configs
|
|
||||||
|
|
||||||
# shorter timeouts
|
|
||||||
new_configs["general"]["connect_timeout"] = 500
|
|
||||||
new_configs["general"]["ban_time"] = 1
|
|
||||||
new_configs["general"]["shutdown_timeout"] = 1
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1
|
|
||||||
|
|
||||||
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
|
||||||
|
|
||||||
conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
|
|
||||||
10.times do
|
|
||||||
Thread.new do
|
|
||||||
conn = PG::connect(conn_str)
|
|
||||||
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
|
||||||
ensure
|
|
||||||
conn&.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
sleep(0.5)
|
|
||||||
conn_under_test = PG::connect(conn_str)
|
|
||||||
stdout, stderr = with_captured_stdout_stderr do
|
|
||||||
5.times do |i|
|
|
||||||
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
|
||||||
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
|
||||||
sleep 1
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
|
||||||
puts "Pool checkout errors not breaking clients passed"
|
|
||||||
ensure
|
|
||||||
sleep 1
|
|
||||||
admin_conn.async_exec("RELOAD") # Reset state
|
|
||||||
conn_under_test&.close
|
|
||||||
end
|
|
||||||
|
|
||||||
test_extended_protocol_pooler_errors
|
|
||||||
|
|
||||||
# Uncomment these two to see all queries.
|
# Uncomment these two to see all queries.
|
||||||
# ActiveRecord.verbose_query_logs = true
|
# ActiveRecord.verbose_query_logs = true
|
||||||
@@ -198,68 +111,3 @@ begin
|
|||||||
rescue ActiveRecord::StatementInvalid
|
rescue ActiveRecord::StatementInvalid
|
||||||
puts 'OK'
|
puts 'OK'
|
||||||
end
|
end
|
||||||
|
|
||||||
# Test evil clients
|
|
||||||
def poorly_behaved_client
|
|
||||||
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
|
||||||
conn.async_exec 'BEGIN'
|
|
||||||
conn.async_exec 'SELECT 1'
|
|
||||||
|
|
||||||
conn.close
|
|
||||||
puts 'Bad client ok'
|
|
||||||
end
|
|
||||||
|
|
||||||
25.times do
|
|
||||||
poorly_behaved_client
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
def test_server_parameters
|
|
||||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
|
||||||
raise StandardError, "Bad server version" if server_conn.server_version == 0
|
|
||||||
server_conn.close
|
|
||||||
|
|
||||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
|
||||||
raise StandardError, "Bad server version" if admin_conn.server_version == 0
|
|
||||||
admin_conn.close
|
|
||||||
|
|
||||||
puts 'Server parameters ok'
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
def test_reload_pool_recycling
|
|
||||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
|
||||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
|
||||||
|
|
||||||
server_conn.async_exec("BEGIN")
|
|
||||||
conf_editor = ConfigEditor.new
|
|
||||||
new_configs = conf_editor.original_configs
|
|
||||||
|
|
||||||
# swap shards
|
|
||||||
new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1"
|
|
||||||
new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0"
|
|
||||||
|
|
||||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
|
||||||
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
|
||||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
|
||||||
server_conn.async_exec("COMMIT;")
|
|
||||||
|
|
||||||
# Transaction finished, client should get new configs
|
|
||||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
|
||||||
server_conn.close()
|
|
||||||
|
|
||||||
# New connection should get new configs
|
|
||||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
|
||||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
|
||||||
|
|
||||||
ensure
|
|
||||||
admin_conn.async_exec("RELOAD") # Go back to old state
|
|
||||||
admin_conn.close
|
|
||||||
server_conn.close
|
|
||||||
puts "Pool Recycling okay!"
|
|
||||||
end
|
|
||||||
|
|
||||||
test_reload_pool_recycling
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -70,23 +70,35 @@ GRANT CONNECT ON DATABASE shard2 TO other_user;
|
|||||||
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
||||||
|
|
||||||
\c shard0
|
\c shard0
|
||||||
|
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||||
|
|
||||||
\c shard1
|
\c shard1
|
||||||
|
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||||
|
|
||||||
|
|
||||||
\c shard2
|
\c shard2
|
||||||
|
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||||
|
|
||||||
\c some_db
|
\c some_db
|
||||||
|
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||||
|
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user;
|
||||||
GRANT ALL ON SCHEMA public TO simple_user;
|
GRANT ALL ON SCHEMA public TO simple_user;
|
||||||
GRANT ALL ON TABLE data TO simple_user;
|
GRANT ALL ON TABLE data TO simple_user;
|
||||||
|
|||||||
Reference in New Issue
Block a user